对于RedisMessageListenerContainer改造如下
问题原因:
- 节点隔离:每个节点只能接收发布到该节点的消息
- 无跨节点转发:消息不会在集群节点间自动转发
- 客户端连接单一:RedisMessageListenerContainer 通常只连接到集群中的一个节点
主要是对RedisMessageListenerContainer的改造 核心原理:
RedisMessageListenerContainer在监听时需要调用一个connection,但是默认调用的是getConnection() 而不是getClusterConnection(),所以需要为每一个master节点创建一个RedisMessageListenerContainer去做监听。创建的方式有很多,下面只是一种。
@Componentpublic class ClusterConfig implements ApplicationContextAware {
@Autowired private RedisConnectionFactory redisConnectionFactory;
private ApplicationContext applicationContext;
@Autowired private RedisHsetListener redisHsetListener;
@Autowired private RedisLpushListener redisLpushListener;
@PostConstruct public void init() { RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection(); DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory)applicationContext.getAutowireCapableBeanFactory();
if (redisClusterConnection != null) { Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes(); for (RedisClusterNode node : nodes) { if (node.isMaster()) { String containerBeanName = "messageContainer" + node.hashCode(); if (beanFactory.containsBean(containerBeanName)) { return; } JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort()); jedisShardInfo.setPassword("password"); JedisConnectionFactory factory = new JedisConnectionFactory( jedisShardInfo); BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder .genericBeanDefinition(RedisMessageListenerContainer.class); containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory); containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON); containerBeanDefinitionBuilder.setLazyInit(false);
beanFactory.registerBeanDefinition(containerBeanName, containerBeanDefinitionBuilder.getRawBeanDefinition());
RedisMessageListenerContainer container = beanFactory .getBean(containerBeanName, RedisMessageListenerContainer.class); String listenerBeanName = "messageListener" + node.hashCode(); if (beanFactory.containsBean(listenerBeanName)) { return; } MessageListener messageListener = new RedisHsetListener(); MessageListener lPushListener = new RedisLpushListener(); container.addMessageListener(redisHsetListener, new PatternTopic("__keyevent@0__:set")); container.addMessageListener(redisLpushListener,new PatternTopic("__keyevent@*__:lpush")); container.start(); } } } }
@Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.applicationContext = applicationContext; }}
完结!