package com.thhy.screen.modules.biz.bigscreen.controller; import com.thhy.general.common.enums.TopicConstant; import com.thhy.general.config.GlobalConfig; import org.apache.rocketmq.spring.annotation.ConsumeMode; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.redisson.api.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Configuration; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; @Component @RocketMQMessageListener(topic = "prolisten",consumerGroup = TopicConstant.PROLISTEN_GROUP) public class ProListenMqRouter implements RocketMQListener { @Autowired private RocketMQTemplate rocketMQTemplate; @Autowired private RedissonClient redissonClient; @Autowired private GlobalConfig globalConfig; private Logger logger = LoggerFactory.getLogger("ProListenMqRouter"); @Override public void onMessage(String s) { logger.info("到达生产监控路由"); List keys = getAllSessions(); logger.info("从redis获取Session——"+keys.size()); for(String key : keys){ String newKey = key.replace("pipe:ws:sessioninfo:",""); String[] hid = newKey.split("-"); Message message = MessageBuilder.withPayload(s).build(); rocketMQTemplate.syncSend(TopicConstant.PROLISTENEXC+":"+hid[0]+"-"+hid[1],message); logger.info("向机器"+hid[0]+"发送消息成功"); } } public List getAllSessions(){ //List> futureList = new ArrayList<>(); RKeys rKeys = redissonClient.getKeys(); Iterable keysByPattern = rKeys.getKeysByPattern(globalConfig.getKeyPrefix()+":ws:sessioninfo:*"); List strings = new ArrayList<>(); for (String s : keysByPattern) { strings.add(s); } return strings; } }