skip to content
Jayton's Blog

Redis的集群适配问题

/ 2 min read

对于RedisMessageListenerContainer改造如下

问题原因:

  • 节点隔离:每个节点只能接收发布到该节点的消息
  • 无跨节点转发:消息不会在集群节点间自动转发
  • 客户端连接单一:RedisMessageListenerContainer 通常只连接到集群中的一个节点

主要是对RedisMessageListenerContainer的改造 核心原理:

RedisMessageListenerContainer在监听时需要调用一个connection,但是默认调用的是getConnection() 而不是getClusterConnection(),所以需要为每一个master节点创建一个RedisMessageListenerContainer去做监听。创建的方式有很多,下面只是一种。

@Component
public class ClusterConfig implements ApplicationContextAware {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
private ApplicationContext applicationContext;
@Autowired
private RedisHsetListener redisHsetListener;
@Autowired
private RedisLpushListener redisLpushListener;
@PostConstruct
public void init() {
RedisClusterConnection redisClusterConnection = redisConnectionFactory.getClusterConnection();
DefaultListableBeanFactory beanFactory = (DefaultListableBeanFactory)applicationContext.getAutowireCapableBeanFactory();
if (redisClusterConnection != null) {
Iterable<RedisClusterNode> nodes = redisClusterConnection.clusterGetNodes();
for (RedisClusterNode node : nodes) {
if (node.isMaster()) {
String containerBeanName = "messageContainer" + node.hashCode();
if (beanFactory.containsBean(containerBeanName)) {
return;
}
JedisShardInfo jedisShardInfo = new JedisShardInfo(node.getHost(), node.getPort());
jedisShardInfo.setPassword("password");
JedisConnectionFactory factory = new JedisConnectionFactory(
jedisShardInfo);
BeanDefinitionBuilder containerBeanDefinitionBuilder = BeanDefinitionBuilder
.genericBeanDefinition(RedisMessageListenerContainer.class);
containerBeanDefinitionBuilder.addPropertyValue("connectionFactory", factory);
containerBeanDefinitionBuilder.setScope(BeanDefinition.SCOPE_SINGLETON);
containerBeanDefinitionBuilder.setLazyInit(false);
beanFactory.registerBeanDefinition(containerBeanName,
containerBeanDefinitionBuilder.getRawBeanDefinition());
RedisMessageListenerContainer container = beanFactory
.getBean(containerBeanName, RedisMessageListenerContainer.class);
String listenerBeanName = "messageListener" + node.hashCode();
if (beanFactory.containsBean(listenerBeanName)) {
return;
}
MessageListener messageListener = new RedisHsetListener();
MessageListener lPushListener = new RedisLpushListener();
container.addMessageListener(redisHsetListener, new PatternTopic("__keyevent@0__:set"));
container.addMessageListener(redisLpushListener,new PatternTopic("__keyevent@*__:lpush"));
container.start();
}
}
}
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.applicationContext = applicationContext;
}
}
完结!