我在生产项目里是如何使用Redis发布订阅的?(二)Java版代码实现(含源码)
2020-12-13 06:28
标签:pool 业务 添加 web object als type 初始化 cto 上篇文章讲了在实际项目里的哪些业务场景用到Redis发布订阅,这篇文章就讲一下,在Java中如何实现的。 发布订阅的理论以及使用场景大家都已经有了大致了解了,但是怎么用代码实现发布订阅呢?在这里给大家分享一下实现方式。 我们以上篇文章的第三种使用场景为例,先来看一下整体实现类图吧。 解释一下,这里我们首先定义一个统一接口`ICacheUpdate`,只有一个`update`方法,我们令`Service`层实现这个方法,执行具体的更新操作。 我们再来看`RedisMsgPubSub`,它继承`redis.clients.jedis.JedisPubSub`,主要重写其`onMessage()`方法(订阅的频道有消息到来时会触发这个方法),我们在这个方法里调用`RedisMsgPubSub`的`update`方法执行更新操作。 当我们有多个`Service`实现`ICacheUpdate`时,我们就非常迫切地需要一个管理器来集中管理这些`Service`,并且当触发onMessage方法时要告诉onMessage方法具体调用哪个`ICacheUpdate`的实现类,所以我们有了`PubSubManager`。并且我们单独开启一个线程来维护发布订阅,所以管理器继承了`Thread`类。 具体代码: 统一接口 Service层 实现ICacheUpdate的update方法,执行具体的更新操作 Redis发布订阅的扩展类 作用: 1、统一管理ICacheUpdate,把所有实现ICacheUpdate接口的类添加到updates容器 2、重写onMessage方法,订阅到消息后进行刷新缓存的操作 发布订阅的管理器 执行的操作: 1、将所有需要刷新加载的Service类(实现ICacheUpdate接口)添加到RedisMsgPubSub的updates中 2、启动线程订阅pubsub_config频道,收到消息后的五秒后再次订阅(避免订阅到一次消息后结束订阅) 到此,Redis的发布订阅大致已经实现。我们什么时候启用呢?我们可以选择在启动项目时完成订阅和基础数据的加载,所以我们通过实现`javax.servlet.SevletContextListener`来完成这一操作。然后将监听器添加到`web.xml`。 CacheInitListener.java web.xml 【end】 上篇文章讲了在实际项目里的哪些业务场景用到Redis发布订阅,这篇文章就讲一下,在Java中如何实现的。 我在生产项目里是如何使用Redis发布订阅的?(二)Java版代码实现(含源码) 标签:pool 业务 添加 web object als type 初始化 cto 原文地址:https://www.cnblogs.com/ibigboy/p/11180600.html图解代码结构
代码实现
public interface ICacheUpdate {
public void update();
}
public class InfoService implements ICacheUpdate {
private static Logger logger = LoggerFactory.getLogger(InfoService.class);
@Autowired
private RedisCache redisCache;
@Autowired
private InfoMapper infoMapper;
/**
* 按信息类型分类查询信息
* @return
*/
public Map
public class RedisMsgPubSub extends JedisPubSub {
private static Logger logger = LoggerFactory.getLogger(RedisMsgPubSub.class);
private Map
public class PubSubManager extends Thread{
private static Logger logger = LoggerFactory.getLogger(PubSubManager.class);
public static Jedis jedis;
RedisMsgPubSub msgPubSub = new RedisMsgPubSub();
//频道
public static final String PUNSUB_CONFIG = "pubsub_config";
//1.将所有需要刷新加载的Service类(实现ICacheUpdate接口)添加到RedisMsgPubSub的updates中
public boolean addListener(String key, ICacheUpdate listener){
return msgPubSub.addListener(key,listener);
}
@Override
public void run(){
while (true){
try {
JedisPool jedisPool = SpringTools.getBean("jedisPool", JedisPool.class);
if(jedisPool!=null){
jedis = jedisPool.getResource();
if(jedis!=null){
//2.启动线程订阅pubsub_config频道 阻塞
jedis.subscribe(msgPubSub,PUNSUB_CONFIG);
}
}
} catch (Exception e) {
logger.error("redis connect error!");
} finally {
if(jedis!=null)
jedis.close();
}
try {
//3.收到消息后的五秒后再次订阅(避免订阅到一次消息后结束订阅)
Thread.sleep(5000);
} catch (InterruptedException e) {
logger.error("InterruptedException in redis sleep!");
}
}
}
}
/**
* 加载系统参数
*/
public class CacheInitListener implements ServletContextListener{
private static Logger logger = LoggerFactory.getLogger(CacheInitListener.class);
@Override
public void contextDestroyed(ServletContextEvent arg0) {
}
@Override
public void contextInitialized(ServletContextEvent arg0) {
logger.info("---CacheListener初始化开始---");
init();
logger.info("---CacheListener初始化结束---");
}
public void init() {
try {
//获得管理器
PubSubManager pubSubManager = SpringTools.getBean("pubSubManager", PubSubManager.class);
InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
//添加到管理器
pubSubManager.addListener("infoService", infoService);
//other service...
//启动线程执行订阅操作
pubSubManager.start();
//初始化加载
loadParamToRedis();
} catch (Exception e) {
logger.info(e.getMessage(), e);
}
}
private void loadParamToRedis() {
InfoService infoService = SpringTools.getBean("infoService", InfoService.class);
infoService.update();
//other service...
}
}
上一篇:CreateWindow
下一篇:win8.1桌面上快捷方式的箭头
文章标题:我在生产项目里是如何使用Redis发布订阅的?(二)Java版代码实现(含源码)
文章链接:http://soscw.com/essay/33095.html