需求分析
在分享源码之前,先将b2b2c系统中商品模块需求整理、明确,方便源码的理解。
业务需求
- b2b2c电子商务系统中商品的库存存放在redis和数据库中,实现发货退货等操作库存的扣减或增加
技术需求
- redis事务问题,若扣减库存完成后,发生异常,则redis没有事务,无法实现数据回滚,导致数据异常
- 采用lua脚本扣减库存方式,原子性提交操作,lua脚本中实现扣减失败则回滚操作
- 数据库中的库存信息,非实时更新,而是采用缓冲池方式,缓冲池方式可以自主选择是否开启
架构思路
商品库存领域模型架构
基于lua+redis的库存扣减
GoodsQuantityVO
/**
* 商品库存vo
* @author fk
* @version v6.4
* @since v6.4
* 2017年9月7日 上午11:23:16
*/
public class GoodsQuantityVO implements Cloneable{
private Integer goodsId;
private Integer skuId;
private Integer quantity;
private QuantityType quantityType;
public GoodsQuantityVO() {}
public GoodsQuantityVO(Integer goodsId, Integer skuId, Integer quantity ) {
super();
this.goodsId = goodsId;
this.skuId = skuId;
this.quantity = quantity;
}
setter and getter
}
GoodsQuantityManager
/**
* 商品库存接口
* @author fk
* @version v2.0
* @since v7.0.0
* 2018年3月23日 上午11:47:29
*
* @version 3.0
* 统一为一个接口(更新接口)<br/>
* 内部实现为redis +lua 保证原子性 -- by kingapex 2019-01-17
*/
public interface GoodsQuantityManager {
/**
* 库存更新接口
* @param goodsQuantityList 要更新的库存vo List
* @return 如果更新成功返回真,否则返回假
*/
Boolean updateSkuQuantity(List<GoodsQuantityVO> goodsQuantityList );
/**
* 同步数据库数据
*/
void syncDataBase();
/**
* 为某个sku 填充库存cache<br/>
* 库存数量由数据库中获取<br/>
* 一般用于缓存被击穿的情况
* @param skuId
* @return 可用库存和实际库存
*/
Map<String,Integer> fillCacheFromDB(int skuId);
}
GoodsQuantityManagerImpl
库存业务类基于lua+redis的实现:
/**
* 商品库存接口
*
* @author fk
* @author kingapex
* @version v2.0 written by kingapex 2019年2月27日
* 采用lua脚本执行redis中的库存扣减<br/>
* 数据库的更新采用非时时同步<br/>
* 而是建立了一个缓冲池,当达到一定条件时再同步数据库<br/>
* 这样条件有:缓冲区大小,缓冲次数,缓冲时间<br/>
* 上述条件在配置中心可以配置,如果没有配置采用 ${@link UpdatePool} 默认值<br/>
* 在配置项说明:<br/>
* <li>缓冲区大小:javashop.pool.stock.max-pool-size</li>
* <li>缓冲次数:javashop.pool.stock.max-update-time</li>
* <li>缓冲时间(秒数):javashop.pool.stock.max-lazy-second</li>
* @see JavashopConfig
*/
@Service
public class GoodsQuantityManagerImpl implements GoodsQuantityManager {
private final Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private DaoSupport daoSupport;
@Autowired
private JavashopConfig javashopConfig;
/**
* sku库存更新缓冲池
*/
private static UpdatePool skuUpdatePool;
/**
* goods库存更新缓冲池
*/
private static UpdatePool goodsUpdatePool;
/**
* 单例获取sku pool ,初始化时设置参数
*
* @return
*/
private UpdatePool getSkuPool() {
if (skuUpdatePool == null) {
skuUpdatePool = new UpdatePool(javashopConfig.getMaxUpdateTime(), javashopConfig.getMaxPoolSize(), javashopConfig.getMaxLazySecond());
logger.debug("初始化sku pool:");
logger.debug(skuUpdatePool.toString());
}
return skuUpdatePool;
}
/**
* 单例获取goods pool ,初始化时设置参数
*
* @return
*/
private UpdatePool getGoodsPool() {
if (goodsUpdatePool == null) {
goodsUpdatePool = new UpdatePool(javashopConfig.getMaxUpdateTime(), javashopConfig.getMaxPoolSize(), javashopConfig.getMaxLazySecond());
}
return goodsUpdatePool;
}
@Autowired
public StringRedisTemplate stringRedisTemplate;
private static RedisScript<Boolean> script = null;
private static RedisScript<Boolean> getRedisScript() {
if (script != null) {
return script;
}
ScriptSource scriptSource = new ResourceScriptSource(new ClassPathResource("sku_quantity.lua"));
String str = null;
try {
str = scriptSource.getScriptAsString();
} catch (IOException e) {
e.printStackTrace();
}
script = RedisScript.of(str, Boolean.class);
return script;
}
@Override
public Boolean updateSkuQuantity(List<GoodsQuantityVO> goodsQuantityList) {
List<Integer> skuIdList = new ArrayList();
List<Integer> goodsIdList = new ArrayList();
List keys = new ArrayList<>();
List values = new ArrayList<>();
for (GoodsQuantityVO quantity : goodsQuantityList) {
Assert.notNull(quantity.getGoodsId(), "goods id must not be null");
Assert.notNull(quantity.getSkuId(), "sku id must not be null");
Assert.notNull(quantity.getQuantity(), "quantity id must not be null");
Assert.notNull(quantity.getQuantityType(), "Type must not be null");
//sku库存
if (QuantityType.enable.equals(quantity.getQuantityType())) {
keys.add(StockCacheKeyUtil.skuEnableKey(quantity.getSkuId()));
} else if (QuantityType.actual.equals(quantity.getQuantityType())) {
keys.add(StockCacheKeyUtil.skuActualKey(quantity.getSkuId()));
}
values.add("" + quantity.getQuantity());
//goods库存key
if (QuantityType.enable.equals(quantity.getQuantityType())) {
keys.add(StockCacheKeyUtil.goodsEnableKey(quantity.getGoodsId()));
} else if (QuantityType.actual.equals(quantity.getQuantityType())) {
keys.add(StockCacheKeyUtil.goodsActualKey(quantity.getGoodsId()));
}
values.add("" + quantity.getQuantity());
skuIdList.add(quantity.getSkuId());
goodsIdList.add(quantity.getGoodsId());
}
RedisScript<Boolean> redisScript = getRedisScript();
Boolean result = stringRedisTemplate.execute(redisScript, keys, values.toArray());
logger.debug("更新库存:");
logger.debug(goodsQuantityList.toString());
logger.debug("更新结果:" + result);
//如果lua脚本执行成功则记录缓冲区
if (result) {
//判断配置文件中设置的商品库存缓冲池是否开启
if (javashopConfig.isStock()) {
//是否需要同步数据库
boolean needSync = getSkuPool().oneTime(skuIdList);
getGoodsPool().oneTime(goodsIdList);
logger.debug("是否需要同步数据库:" + needSync);
logger.debug(getSkuPool().toString());
//如果开启了缓冲池,并且缓冲区已经饱和,则同步数据库
if (needSync) {
syncDataBase();
}
} else {
//如果未开启缓冲池,则实时同步商品数据库中的库存数据
syncDataBase(skuIdList, goodsIdList);
}
}
return result;
}
@Override
public void syncDataBase() {
//获取同步的skuid 和goodsid
List<Integer> skuIdList = getSkuPool().getTargetList();
List<Integer> goodsIdList = getGoodsPool().getTargetList();
logger.debug("goodsIdList is:");
logger.debug(goodsIdList.toString());
//判断要同步的goods和sku集合是否有值
if (skuIdList.size() != 0 && goodsIdList.size() != 0) {
//同步数据库
syncDataBase(skuIdList, goodsIdList);
}
//重置缓冲池
getSkuPool().reset();
getGoodsPool().reset();
}
@Override
public Map<String, Integer> fillCacheFromDB(int skuId) {
Map<String, Integer> map = daoSupport.queryForMap("select enable_quantity,quantity from es_goods_sku where sku_id=?", skuId);
Integer enableNum = map.get("enable_quantity");
Integer actualNum = map.get("quantity");
stringRedisTemplate.opsForValue().set(StockCacheKeyUtil.skuActualKey(skuId), "" + actualNum);
stringRedisTemplate.opsForValue().set(StockCacheKeyUtil.skuEnableKey(skuId), "" + enableNum);
return map;
}
/**
* 同步数据库中的库存
*
* @param skuIdList 需要同步的skuid
* @param goodsIdList 需要同步的goodsid
*/
private void syncDataBase(List<Integer> skuIdList, List<Integer> goodsIdList) {
//要形成的指更新sql
List<String> sqlList = new ArrayList<String>();
//批量获取sku的库存
List skuKeys = StockCacheKeyUtil.skuKeys(skuIdList);
List<String> skuQuantityList = stringRedisTemplate.opsForValue().multiGet(skuKeys);
int i = 0;
//形成批量更新sku的list
for (Integer skuId : skuIdList) {
String sql = "update es_goods_sku set enable_quantity=" + skuQuantityList.get(i) + ", quantity=" + skuQuantityList.get(i + 1) + " where sku_id=" + skuId;
daoSupport.execute(sql);
i = i + 2;
}
//批量获取商品的库存
List goodsKeys = createGoodsKeys(goodsIdList);
List<String> goodsQuantityList = stringRedisTemplate.opsForValue().multiGet(goodsKeys);
i = 0;
//形成批量更新goods的list
for (Integer goodsId : goodsIdList) {
String sql = "update es_goods set enable_quantity=" + goodsQuantityList.get(i) + ", quantity=" + goodsQuantityList.get(i + 1) + " where goods_id=" + goodsId;
daoSupport.execute(sql);
i = i + 2;
}
}
/**
* 生成批量获取goods库存的keys
*
* @param goodsIdList
* @return
*/
private List createGoodsKeys(List<Integer> goodsIdList) {
List keys = new ArrayList();
for (Integer goodsId : goodsIdList) {
keys.add(StockCacheKeyUtil.goodsEnableKey(goodsId));
keys.add(StockCacheKeyUtil.goodsActualKey(goodsId));
}
return keys;
}
}
sku_quantity.lua
库存扣减lua脚本
-- 可能回滚的列表,一个记录要回滚的skuid一个记录库存
local skuid_list= {}
local stock_list= {}
local arg_list = ARGV;
local function cut ( key , num )
KEYS[1] = key;
local value = redis.call("get",KEYS[1])
if not value then
value = 0;
end
value=value+num
if(value<0)
then
-- 发生超卖
return false;
end
redis.call("set",KEYS[1],value)
return true
end
local function rollback ( )
for i,k in ipairs (skuid_list) do
-- 还原库存
KEYS[1] = k;
redis.call("incrby",KEYS[1],0-stock_list[i])
end
end
local function doExec()
for i, k in ipairs (arg_list)
do
local num = tonumber(k)
local key= KEYS[i]
local result = cut(key,num)
-- 发生超卖,需要回滚
if (result == false)
then
rollback()
return false
else
-- 记录可能要回滚的数据
table.insert(skuid_list,key)
table.insert(stock_list,num)
end
end
return true;
end
return doExec()
JavashopConfig
缓冲池相关设置信息
/**
* javashop配置
*
* @author zh
* @version v7.0
* @date 18/4/13 下午8:19
* @since v7.0
*/
@Configuration
@ConfigurationProperties(prefix = "javashop")
@SuppressWarnings("ConfigurationProperties")
public class JavashopConfig {
/**
* 缓冲次数
*/
@Value("${javashop.pool.stock.max-update-timet:#{null}}")
private Integer maxUpdateTime;
/**
* 缓冲区大小
*/
@Value("${javashop.pool.stock.max-pool-size:#{null}}")
private Integer maxPoolSize;
/**
* 缓冲时间(秒数)
*/
@Value("${javashop.pool.stock.max-lazy-second:#{null}}")
private Integer maxLazySecond;
/**
* 商品库存缓冲池开关
* false:关闭(如果配置文件中没有配置此项,则默认为false)
* true:开启(优点:缓解程序压力;缺点:有可能会导致商家中心商品库存数量显示延迟;)
*/
@Value("${javashop.pool.stock:#{false}}")
private boolean stock;
public JavashopConfig() {
}
setter and getter...
}
以上是javashop中商品模块扣减库存的思路以及相关源码。
易族智汇(javashop)原创文章