业务分析
一般而言,商品秒杀大概可以拆分成以下几步:
- 用户校验 校验是否多次抢单,保证每个商品每个用户只能秒杀一次
- 下单 订单信息进入消息队列,等待消费
- 减少库存 消费订单消息,减少商品库存,增加订单记录
- 付款 十五分钟内完成支付,修改支付状态
创建表
goods_info 商品库存表
列
说明
id
主键(uuid)
goods_name
商品名称
goods_stock
商品库存
package com.jason.seckill.order.entity;
/**
* 商品库存
*/
public class GoodsInfo {
private String id;
private String goodsName;
private String goodsStock;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getGoodsName() {
return goodsName;
}
public void setGoodsName(String goodsName) {
this.goodsName = goodsName;
}
public String getGoodsStock() {
return goodsStock;
}
public void setGoodsStock(String goodsStock) {
this.goodsStock = goodsStock;
}
@Override
public String toString() {
return "GoodsInfo{" +
"id='" + id + '\'' +
", goodsName='" + goodsName + '\'' +
", goodsStock='" + goodsStock + '\'' +
'}';
}
}
order_info 订单记录表
列
说明
id
主键(uuid)
user_id
用户id
goods_id
商品id
pay_status
支付状态(0-超时未支付 1-已支付 2-待支付)
package com.jason.seckill.order.entity;
/**
* 下单记录
*/
public class OrderRecord {
private String id;
private String userId;
private String goodsId;
/**
* 0-超时未支付 1-已支付 2-待支付
*/
private Integer payStatus;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public String getUserId() {
return userId;
}
public void setUserId(String userId) {
this.userId = userId;
}
public String getGoodsId() {
return goodsId;
}
public void setGoodsId(String goodsId) {
this.goodsId = goodsId;
}
public Integer getPayStatus() {
return payStatus;
}
public void setPayStatus(Integer payStatus) {
this.payStatus = payStatus;
}
@Override
public String toString() {
return "OrderRecord{" +
"id='" + id + '\'' +
", userId='" + userId + '\'' +
", goodsId='" + goodsId + '\'' +
'}';
}
}
功能实现
1.用户校验
使用redis做用户校验,保证每个用户每个商品只能抢一次,上代码:
public boolean checkSeckillUser(OrderRequest order) {
String key = env.getProperty("seckill.redis.key.prefix") + order.getUserId() + order.getGoodsId();
return redisTemplate.opsForValue().setIfAbsent(key,"1");
}
userId+orderId的组合作为key,利用redis的setnx分布式锁原理来实现。如果是限时秒杀,可以通过设置key的过期时间来实现。
2.下单
下单信息肯定是要先扔到消息队列里的,这里采用RabbitMQ来做消息队列,先来看一下消息队列的模型图: rabbitmq的配置:
#rabbitmq配置
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
#消费者数量
spring.rabbitmq.listener.simple.concurrency=5
#最大消费者数量
spring.rabbitmq.listener.simple.max-concurrency=10
#消费者每次从队列获取的消息数量。写多了,如果长时间得不到消费,数据就一直得不到处理
spring.rabbitmq.listener.simple.prefetch=1
#消费接收确认机制-手动确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
mq.env=local
#订单处理队列
#交换机名称
order.mq.exchange.name=${mq.env}:order:mq:exchange
#队列名称
order.mq.queue.name=${mq.env}:order:mq:queue
#routingkey
order.mq.routing.key=${mq.env}:order:mq:routing:key
rabbitmq配置类OrderRabbitmqConfig:
/**
* rabbitmq配置
*/
@Configuration
public class OrderRabbitmqConfig {
private static final Logger logger = LoggerFactory.getLogger(OrderRabbitmqConfig.class);
@Autowired
private Environment env;
/**
* channel链接工厂
*/
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 监听器容器配置
*/
@Autowired
private SimpleRabbitListenerContainerFactoryConfigurer factoryConfigurer;
/**
* 声明rabbittemplate
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(){
//消息发送成功确认,对应application.properties中的spring.rabbitmq.publisher-confirms=true
connectionFactory.setPublisherConfirms(true);
//消息发送失败确认,对应application.properties中的spring.rabbitmq.publisher-returns=true
connectionFactory.setPublisherReturns(true);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
//设置消息发送格式为json
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
rabbitTemplate.setMandatory(true);
//消息发送到exchange回调 需设置:spring.rabbitmq.publisher-confirms=true
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("消息发送成功:correlationData({}),ack({}),cause({})",correlationData,ack,cause);
}
});
//消息从exchange发送到queue失败回调 需设置:spring.rabbitmq.publisher-returns=true
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}",exchange,routingKey,replyCode,replyText,message);
}
});
return rabbitTemplate;
}
//---------------------------------------订单队列------------------------------------------------------
/**
* 声明订单队列的交换机
* @return
*/
@Bean("orderTopicExchange")
public TopicExchange orderTopicExchange(){
//设置为持久化 不自动删除
return new TopicExchange(env.getProperty("order.mq.exchange.name"),true,false);
}
/**
* 声明订单队列
* @return
*/
@Bean("orderQueue")
public Queue orderQueue(){
return new Queue(env.getProperty("order.mq.queue.name"),true);
}
/**
* 将队列绑定到交换机
* @return
*/
@Bean
public Binding simpleBinding(){
return BindingBuilder.bind(orderQueue()).to(orderTopicExchange()).with(env.getProperty("order.mq.routing.key"));
}
/**
* 注入订单对列消费监听器
*/
@Autowired
private OrderListener orderListener;
/**
* 声明订单队列监听器配置容器
* @return
*/
@Bean("orderListenerContainer")
public SimpleMessageListenerContainer orderListenerContainer(){
//创建监听器容器工厂
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
//将配置信息和链接信息赋给容器工厂
factoryConfigurer.configure(factory,connectionFactory);
//容器工厂创建监听器容器
SimpleMessageListenerContainer container = factory.createListenerContainer();
//指定监听器
container.setMessageListener(orderListener);
//指定监听器监听的队列
container.setQueues(orderQueue());
return container;
}
}
配置类声明了订单队列,交换机,通过指定的routingkey绑定了队列与交换机。另外,rabbitTemplate用来发送消息,ListenerContainer指定监听器(消费者)监听的队列。
客户下单,生产消息,上代码:
@Service
public class SeckillService {
private static final Logger logger = LoggerFactory.getLogger(SeckillService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
/**
* 生产消息
* @param order
*/
public void seckill(OrderRequest order){
//设置交换机
rabbitTemplate.setExchange(env.getProperty("order.mq.exchange.name"));
//设置routingkey
rabbitTemplate.setRoutingKey(env.getProperty("order.mq.routing.key"));
//创建消息体
Message msg = MessageBuilder.withBody(JSON.toJSONString(order).getBytes()).build();
//发送消息
rabbitTemplate.convertAndSend(msg);
}
}
很简单,操作rabbitTemplate,指定交换机和routingkey,发送消息到绑定的队列,等待消费处理。
3.减少库存
消费者消费订单消息,做业务处理。 看一下监听器(消费者)OrderListener:
/**
* 消息监听器(消费者)
*/
@Component
public class OrderListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(OrderListener.class);
@Autowired
private OrderService orderService;
/**
* 处理接收到的消息
* @param message 消息体
* @param channel 通道,确认消费用
* @throws Exception
*/
@Override
public void onMessage(Message message, Channel channel) throws Exception {
try{
//获取交付tag
long tag = message.getMessageProperties().getDeliveryTag();
String str = new String(message.getBody(),"utf-8");
logger.info("接收到的消息:{}",str);
JSONObject obj = JSONObject.parseObject(str);
//下单,操作数据库
orderService.order(obj.getString("userId"),obj.getString("goodsId"));
//确认消费
channel.basicAck(tag,true);
}catch(Exception e){
logger.error("消息监听确认机制发生异常:",e.fillInStackTrace());
}
}
}
业务处理 OrderService:
@Service
public class OrderService {
@Resource
private SeckillMapper seckillMapper;
/**
* 下单,操作数据库
* @param userId
* @param goodsId
*/
@Transactional()
public void order(String userId,String goodsId){
//该商品库存-1(当库存>0时)
int count = seckillMapper.reduceGoodsStockById(goodsId);
//更新成功,表明抢单成功,插入下单记录,支付状态设为2-待支付
if(count > 0){
OrderRecord orderRecord = new OrderRecord();
orderRecord.setId(CommonUtils.createUUID());
orderRecord.setGoodsId(goodsId);
orderRecord.setUserId(userId);
orderRecord.setPayStatus(2);
seckillMapper.insertOrderRecord(orderRecord);
}
}
}
Dao接口和Mybatis文件就不往出贴了,这里的逻辑是,update goods_info set goods_stock = goods_stock-1 where goods_stock > 0 and id=#{goodsId},这条update相当于将查询库存和减少库存合并为一个原子操作,避免高并发问题,执行成功,插入订单记录,执行失败,则库存不够抢单失败。
4.支付
订单处理完成后,如果库存减少,也就是抢单成功,那么需要用户在十五分钟内完成支付,这块就要用到死信队列(延迟队列)来处理了,先看模型图: DLX:dead-letter Exchange 死信交换机 DLK:dead-letter RoutingKey 死信路由 ttl:time-to-live 超时时间 死信队列中,消息到期后,会通过DLX和DLK进入到pay-queue,进行消费。这是另一组消息队列,和订单消息队列是分开的。这里注意他们的绑定关系,主交换机绑定死信队列,死信交换机绑定的是主队列(pay queue)。 接下来声明图中的一系列组件,首先application.properties中增加配置:
#支付处理队列
#主交换机
pay.mq.exchange.name=${mq.env}:pay:mq:exchange
#死信交换机(DLX)
pay.dead-letter.mq.exchange.name=${mq.env}:pay:dead-letter:mq:exchange
#主队列
pay.mq.queue.name=${mq.env}:pay:mq:queue
#死信队列
pay.dead-letter.mq.queue.name=${mq.env}:pay:dead-letter:mq:queue
#主routingkey
pay.mq.routing.key=${mq.env}:pay:mq:routing:key
#死信routingkey(DLK)
pay.dead-letter.mq.routing.key=${mq.env}:pay:dead-letter:mq:routing:key
#支付超时时间(毫秒)(TTL),测试原因,这里模拟5秒,如果是生产环境,这里可以是15分钟等
pay.mq.ttl=5000
配置类OrderRabbitmqConfig中增加支付队列和死信队列的声明:
/**
* 死信队列,十五分钟超时
* @return
*/
@Bean
public Queue payDeadLetterQueue(){
Map args = new HashMap();
//声明死信交换机
args.put("x-dead-letter-exchange",env.getProperty("pay.dead-letter.mq.exchange.name"));
//声明死信routingkey
args.put("x-dead-letter-routing-key",env.getProperty("pay.dead-letter.mq.routing.key"));
//声明死信队列中的消息过期时间
args.put("x-message-ttl",env.getProperty("pay.mq.ttl",int.class));
//创建死信队列
return new Queue(env.getProperty("pay.dead-letter.mq.queue.name"),true,false,false,args);
}
/**
* 支付队列交换机(主交换机)
* @return
*/
@Bean
public TopicExchange payTopicExchange(){
return new TopicExchange(env.getProperty("pay.mq.exchange.name"),true,false);
}
/**
* 将主交换机绑定到死信队列
* @return
*/
@Bean
public Binding payBinding(){
return BindingBuilder.bind(payDeadLetterQueue()).to(payTopicExchange()).with(env.getProperty("pay.mq.routing.key"));
}
/**
* 支付队列(主队列)
* @return
*/
@Bean
public Queue payQueue(){
return new Queue(env.getProperty("pay.mq.queue.name"),true);
}
/**
* 死信交换机
* @return
*/
@Bean
public TopicExchange payDeadLetterExchange(){
return new TopicExchange(env.getProperty("pay.dead-letter.mq.exchange.name"),true,false);
}
/**
* 将主队列绑定到死信交换机
* @return
*/
@Bean
public Binding payDeadLetterBinding(){
return BindingBuilder.bind(payQueue()).to(payDeadLetterExchange()).with(env.getProperty("pay.dead-letter.mq.routing.key"));
}
/**
* 注入支付监听器
*/
@Autowired
private PayListener payListener;
/**
* 支付队列监听器容器
* @return
*/
@Bean
public SimpleMessageListenerContainer payMessageListenerContainer(){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factoryConfigurer.configure(factory,connectionFactory);
SimpleMessageListenerContainer listenerContainer = factory.createListenerContainer();
listenerContainer.setMessageListener(payListener);
listenerContainer.setQueues(payQueue());
return listenerContainer;
}
支付队列和死信队列的Queue、Exchange、routingkey都已就绪。 看生产者:
@Service
public class OrderService {
@Resource
private SeckillMapper seckillMapper;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private Environment env;
/**
* 下单,操作数据库
* @param userId
* @param goodsId
*/
@Transactional()
public void order(String userId,String goodsId){
//该商品库存-1(当库存>0时)
int count = seckillMapper.reduceGoodsStockById(goodsId);
//更新成功,表明抢单成功,插入下单记录,支付状态设为2-待支付
if(count > 0){
OrderRecord orderRecord = new OrderRecord();
orderRecord.setId(CommonUtils.createUUID());
orderRecord.setGoodsId(goodsId);
orderRecord.setUserId(userId);
orderRecord.setPayStatus(2);
seckillMapper.insertOrderRecord(orderRecord);
//将该订单添加到支付队列
rabbitTemplate.setExchange(env.getProperty("pay.mq.exchange.name"));
rabbitTemplate.setRoutingKey(env.getProperty("pay.mq.routing.key"));
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
String json = JSON.toJSONString(orderRecord);
Message msg = MessageBuilder.withBody(json.getBytes()).build();
rabbitTemplate.convertAndSend(msg);
}
}
}
在OrderService中,数据库操作完成后,将订单信息发送到死信队列,死信队列中的消息会在十五分钟后进入到支付队列,等待消费。 再看消费者:
@Component
public class PayListener implements ChannelAwareMessageListener {
private static final Logger logger = LoggerFactory.getLogger(PayListener.class);
@Autowired
private PayService payService;
@Override
public void onMessage(Message message, Channel channel) throws Exception {
Long tag = message.getMessageProperties().getDeliveryTag();
try {
String str = new String(message.getBody(), "utf-8");
logger.info("接收到的消息:{}",str);
JSONObject json = JSON.parseObject(str);
String orderId = json.getString("id");
//确认是否付款
payService.confirmPay(orderId);
//确认消费
channel.basicAck(tag, true);
}catch(Exception e){
logger.info("支付消息消费出错:{}",e.getMessage());
logger.info("出错的tag:{}",tag);
}
}
}
PayService:
@Service
public class PayService {
private static final Logger logger = LoggerFactory.getLogger(PayService.class);
@Resource
private SeckillMapper seckillMapper;
/**
* 确认是否支付
* @param orderId
*/
public void confirmPay(String orderId){
OrderRecord orderRecord = seckillMapper.selectNoPayOrderById(orderId);
//根据订单号校验该用户是否已支付
if(checkPay(orderId)){
//已支付
orderRecord.setPayStatus(1);
seckillMapper.updatePayStatus(orderRecord);
logger.info("用户{}已支付",orderId);
}else{
//未支付
orderRecord.setPayStatus(0);
seckillMapper.updatePayStatus(orderRecord);
//取消支付后,商品库存+1
seckillMapper.returnStock(orderRecord.getGoodsId());
logger.info("用户{}未支付",orderId);
}
}
/**
* 模拟判断订单支付成功或失败,成功失败随机
* @param orderId
* @return
*/
public boolean checkPay(String orderId){
Random random = new Random();
int res = random.nextInt(2);
return res==0?false:true;
}
这里checkPay()方法模拟调用第三方支付接口来判断用户是否已支付。若支付成功,订单改为已支付状态,支付失败,改为已取消状态,库存退回。
总结
整个demo,是两组消息队列撑起来的,一组订单消息队列,一组支付消息队列,而每一组队列都是由queue、exchange、routingkey、生产者以及消费者组成。交换机通过routingkey绑定队列,rabbitTemplate通过指定交换机和routingkey将消息发送到指定队列,消费者监听该队列进行消费。不同的是第二组支付队列里嵌入了死信队列来做一个十五分钟的延迟支付。