需求场景:分布式项目中,每个子项目有各自的 user 数据库, 在综合管理系统中存放这所有用户信息, 为了保持综合管理系统用户的完整性,
子系统添加用户后将用户信息以json格式保存至redis,然后发布到消息到消息通道,综合管理系统监控到子系统发布的消息前往redis
获取出用户信息保存到自己的数据库
1)redis配置
1 spring:
2 redis:
3 #数据库索引
4 database: 5
7 host: 127.0.0.1
8 port: 6379
9 password: 123456
10 jedis:
11 pool:
12 #最大连接数
13 max-active: 8
14 #最大阻塞等待时间(负数表示没限制)
15 #最大空闲
16 max-idle: 8
17 #最小空闲
18 min-idle: 0
2)集成redis , 初始化redis组件
1 package com.bigcustomer.configs;
2
3
4 import com.bigcustomer.utils.redisUtil.RedisService;
5 import com.fasterxml.jackson.annotation.JsonAutoDetect;
6 import com.fasterxml.jackson.annotation.PropertyAccessor;
7 import com.fasterxml.jackson.databind.ObjectMapper;
8 import org.slf4j.Logger;
9 import org.slf4j.LoggerFactory;
10 import org.springframework.beans.factory.annotation.Autowired;
11 import org.springframework.cache.annotation.CachingConfigurerSupport;
12 import org.springframework.cache.annotation.EnableCaching;
13 import org.springframework.context.annotation.Bean;
14 import org.springframework.context.annotation.Configuration;
15 import org.springframework.data.redis.connection.RedisConnectionFactory;
16 import org.springframework.data.redis.core.RedisTemplate;
17 import org.springframework.data.redis.core.StringRedisTemplate;
18 import org.springframework.data.redis.listener.PatternTopic;
19 import org.springframework.data.redis.listener.RedisMessageListenerContainer;
20 import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
21 import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
22
23
24 /**
25 * @author :CX
26 * @Date :Create in 2018/8/15 14:03
27 * @Effect :
28 */
29
30 @Configuration
31 @EnableCaching//开启注解
32 public class RedisConfig extends CachingConfigurerSupport {
33
34
35 private static Logger logger = LoggerFactory.getLogger(RedisConfig.class);
36 // 自定义的配置类, 存放了通道地址
37 @Autowired
38 private BaseConfig baseConfig;
39
40 /**
41 *@参数
42 *@返回值
43 *@创建人 cx
44 *@创建时间
45 *@描述 //初始化监听器
46 */
47 @Bean
48 RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
49 MessageListenerAdapter listenerAdapter) {
50
51 RedisMessageListenerContainer container = new RedisMessageListenerContainer();
52 container.setConnectionFactory(connectionFactory);
53 //配置监听通道
54 container.addMessageListener(listenerAdapter, new PatternTopic(baseConfig.getRedisAisle()));// 通道的名字
55 logger.info("初始化监听成功,监听通道:【"+baseConfig.getRedisAisle()+"】");
56 return container;
57 }
58
59 /**
60 *@参数
61 *@返回值
62 *@创建人 cx
63 *@创建时间
64 *@描述 利用反射来创建监听到消息之后的执行方法
65 */
66 @Bean
67 MessageListenerAdapter listenerAdapter(RedisService receiver) {
68 return new MessageListenerAdapter(receiver, "receiveMessage");
69 }
70
71 // /**
72 // *@参数
73 // *@返回值
74 // *@创建人 cx
75 // *@创建时间
76 // *@描述 控制线程用的
77 // */
78 // @Bean
79 // Receiver receiver(CountDownLatch latch) {
80 // return new Receiver(latch);
81 // }
82 //
83 // @Bean
84 // CountDownLatch latch() {
85 // return new CountDownLatch(1);
86 // }
87
88 /**
89 *@参数
90 *@返回值
91 *@创建人 cx
92 *@创建时间
93 *@描述 //使用默认的工厂初始化redis操作String模板
94 */
95 @Bean
96 StringRedisTemplate template(RedisConnectionFactory connectionFactory) {
97 return new StringRedisTemplate(connectionFactory);
98 }
99 /**
100 *@参数
101 *@返回值
102 *@创建人 cx
103 *@创建时间
104 *@描述 //使用默认的工厂初始化redis操作map模板
105 */
106 @Bean
107 RedisTemplate redisTemplate(RedisConnectionFactory connectionFactory) {
108
109 Jackson2JsonRedisSerializer<Object> jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer<Object>(Object.class);
110 ObjectMapper om = new ObjectMapper();
111 om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
112 om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
113 jackson2JsonRedisSerializer.setObjectMapper(om);
114 RedisTemplate<String, Object> template = new RedisTemplate<String, Object>();
115 template.setConnectionFactory(connectionFactory);
116 template.setKeySerializer(jackson2JsonRedisSerializer);
117 template.setValueSerializer(jackson2JsonRedisSerializer);
118 template.setHashKeySerializer(jackson2JsonRedisSerializer);
119 template.setHashValueSerializer(jackson2JsonRedisSerializer);
120 template.afterPropertiesSet();
121 return template;
122
123 }
124 }
3 ) 操作string 和 map 的dao封装
1 package com.bigcustomer.utils.redisUtil;
2
3 import com.alibaba.fastjson.JSON;
4 import com.bigcustomer.biguser.service.BigUserService;
5 import org.slf4j.Logger;
6 import org.slf4j.LoggerFactory;
7 import org.springframework.data.redis.core.RedisTemplate;
8 import org.springframework.data.redis.core.StringRedisTemplate;
9 import org.springframework.stereotype.Component;
10
11 import javax.annotation.Resource;
12 import java.util.Map;
13
14 /**
15 * @author :CX
16 * @Date :Create in 2018/8/15 14:19
17 * @Effect : redisDAO封裝
18 */
19 @Component
20 public class MyRedisDao {
21
22 private static Logger logger = LoggerFactory.getLogger(BigUserService.class);
23 @Resource
24 private StringRedisTemplate template;
25
26 @Resource
27 private RedisTemplate redisTemplate;
28
29 //大客户信息同步到redis时保存的map的key
30 private final String BIG_USER_REDIS_KEY = "CM:CHANNELCUSTOMER";
31
32 /**
33 * @参数
34 * @返回值
35 * @创建人 cx
36 * @创建时间
37 * @描述 大客户添加成功后存到redis
38 */
39 public boolean setMap(Map<String , Object> map) {
40
41 try {
42 redisTemplate.opsForHash().putAll(BIG_USER_REDIS_KEY
43 , map);
44 logger.info("同步大客户信息到redis 成功!userId【" + map.get("funiqueid")+ "】");
45 return true;
46 } catch (Exception e) {
47 e.printStackTrace();
48 }
49 logger.info("同步大客户信息到redis 失败!userId【" + map.get("funiqueid")+ "】");
50 return false;
51 }
52
53
54 public Object getMap(String key) {
55
56 try {
57 Object o = redisTemplate.opsForHash().get(BIG_USER_REDIS_KEY, key);
58 if (null != o) {
59 return o;
60 }
61 } catch (Exception e) {
62 e.printStackTrace();
63 }
64 logger.info("获取大客户信息到失败!");
65 return null;
66 }
67
68
69 /**
70 * @参数
71 * @返回值 存在 = true , 不纯在false
72 * @创建人 cx
73 * @创建时间
74 * @描述 判断是否存在 该key对应的值
75 */
76 public boolean isNull(String key) {
77 return template.hasKey(key);
78 }
79
80 /**
81 * @参数
82 * @返回值
83 * @创建人 cx
84 * @创建时间
85 * @描述 设置值 和 过期时间 单位秒
86 */
87 public boolean setValue(String key, Object val, long expire) {
88 if (!this.isNull(key)) {
89 //不存在
90 String jsonString = JSON.toJSONString(val);
91 template.opsForValue().set(key, jsonString, expire);
92 logger.info("***************************成功在缓存中插入:" + key);
93 return true;
94 } else {
95 logger.info("***************************【" + key + "】已经存在缓存");
96 return false;
97 }
98 }
99
100
101 /**
102 * @参数
103 * @返回值
104 * @创建人 cx
105 * @创建时间
106 * @描述 删除
107 */
108 public boolean del(String key) {
109 return template.delete(key);
110 }
111
112 /**
113 * @参数
114 * @返回值
115 * @创建人 cx
116 * @创建时间
117 * @描述 插入直接覆盖
118 */
119 public boolean setValue(String key, Object val) {
120 //不存在
121 String jsonString = JSON.toJSONString(val);
122 // 去掉多余的 “
123 String replace = jsonString.replace("\"", "");
124 template.opsForValue().set(key, replace);
125 logger.info("***************************成功在缓存中插入:" + key);
126 return true;
127 }
128
129 /**
130 * @参数
131 * @返回值
132 * @创建人 cx
133 * @创建时间
134 * @描述 获取对应key 的值
135 */
136 public String getValue(String key) {
137 if (!this.isNull(key)) {
138
139 //不存在
140 logger.info("***************************【" + key + "】不存在缓存");
141 return null;
142 } else {
143 return template.opsForValue().get(key);//根据key获取缓存中的val
144 }
145 }
146
147
148 }
4) 消息发布和监听的服务类
1 package com.bigcustomer.utils.redisUtil;
2
3 import com.bigcustomer.configs.BaseConfig;
4 import huashitech.kissucomponent.service.BaseService;
5 import org.springframework.beans.factory.annotation.Autowired;
6 import org.springframework.data.redis.core.StringRedisTemplate;
7 import org.springframework.stereotype.Service;
8
9 /**
10 * @author :CX
11 * @Date :Create in 2018/8/23 10:22
12 * @Effect : redis 通道消息发送和监听接受
13 */
14 @Service
15 public class RedisService extends BaseService {
16
17 @Autowired
18 private StringRedisTemplate template;
19 @Autowired
20 private BaseConfig baseConfig;
21 @Autowired
22 RedisService redisService;
23
24 /**
25 *@参数
26 *@返回值
27 *@创建人 cx
28 *@创建时间
29 *@描述 向默认通道发送消息
30 */
31 public void setMessage( Long funiqueid) {
32
33 template.convertAndSend(baseConfig.getRedisAisle(),
34 baseConfig.getRedisMessageName() +funiqueid);
35 }
36
37
38 /**
39 *@参数
40 *@返回值
41 *@创建人 cx
42 *@创建时间
43 *@描述 接受监听到的消息
44 */
45 public void receiveMessage(String message) {
46 logger.info("接收redis通道消息:"+message);
47 }
48 }
5) 使用
1 dao.getTransactionManager().doTransaction((TransactionStatus s) -> {
2 //插入数据库
3 int insert = dao.insert(tbCmChannelcustomerModel);
4 // 加入缓存
5 HashMap<String, Object> map = new HashMap<>();
6 map.put(tbCmChannelcustomerModel.getFuniqueid().toString()
7 , JSON.toJSONString(tbCmChannelcustomerModel));
8 redisDao.setMap(map);
9 // 发布redis通知消息
10 redisService.setMessage(tbCmChannelcustomerModel.getFuniqueid());
11 });