package com.thhy.screen.modules.biz.bigscreen.controller; import com.alibaba.fastjson.JSON; import com.thhy.general.common.BasicResult; import com.thhy.general.common.enums.ProListenType; import com.thhy.general.config.GlobalConfig; import com.thhy.general.utils.SpringContextUtils; import com.thhy.screen.config.NetUtils; import com.thhy.screen.modules.biz.bigscreen.entity.*; import com.thhy.screen.modules.biz.bigscreen.mapper.BigScreenMapper; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.redisson.api.RBucket; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.Session; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.math.BigDecimal; import java.math.RoundingMode; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; @Component public class ExecProListener implements MessageListenerConcurrently { private RedissonClient redissonClient; public RedissonClient getRedissonClient() { return SpringContextUtils.getBean(RedissonClient.class); } private GlobalConfig globalConfig; public GlobalConfig getGlobalConfig() { return SpringContextUtils.getBean(GlobalConfig.class); } @Autowired private BigScreenMapper bigScreenMapper; public BigScreenMapper getBigScreenMapper() { return SpringContextUtils.getBean(BigScreenMapper.class); } private Logger logger = LoggerFactory.getLogger(ExecProListener.class); @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { logger.info("开始处理生产监控消息,个数为:"+list.size()); try { if(list != null || list.size()>0){ MessageExt messageExt = list.get(0); String msgBody = new String(messageExt.getBody(), "utf-8"); logger.info("接收消息整体为:" + msgBody); logger.info("生产监控终端处理收到消息"+msgBody); CopyOnWriteArraySet SESSIONS = ProListenSocketServer.SESSIONS; SESSIONS.forEach(session ->{ try { List> mapList = getData(session,msgBody); if (session.isOpen()) { session.getBasicRemote().sendText(JSON.toJSONString(BasicResult.success(mapList))); logger.info("成功推送消息到生产监控大屏"+session.getId()+"___"+session.getRequestURI().getHost()); } } catch (IOException e) { throw new RuntimeException(e); } }); } } catch (UnsupportedEncodingException e) { logger.info("生产监控消息消费失败"+e.getMessage()); //throw new RuntimeException(e); if(list.get(0).getReconsumeTimes()==3){ logger.info("尝试重新消费3次,都失败"); }else{ logger.info("重试"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } logger.info("生产监控消息消费成功"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } public List> getData(Session session,String dataType){ redissonClient = getRedissonClient(); globalConfig = getGlobalConfig(); bigScreenMapper = getBigScreenMapper(); String host = NetUtils.getIp(); RBucket companyRBucket = redissonClient.getBucket(globalConfig.getKeyPrefix()+":ws:company:"+host+"-"+globalConfig.getServerPort()+"-"+session.getId()); String companyId = companyRBucket.get(); List> mapList = new ArrayList<>(); Map map = new HashMap<>(); map.put("mod",dataType); if(dataType.equals(ProListenType.PIPEYEARMONTH)){ //管片年度和月度 Map map1 = new HashMap<>(); List yearCountList = bigScreenMapper.queryAllProject(companyId); yearCountList.forEach(yc ->{ PipePlanActual pipePlanActual = bigScreenMapper.pipePlanActual(companyId,yc.getProId()); pipePlanActual.setPlanProduct(yc.getPlanOutput()); BigDecimal rate = new BigDecimal(0); if(pipePlanActual.getRingCount()!=0&&yc.getPlanOutput()!=0){ rate = new BigDecimal(pipePlanActual.getRingCount()).divide(new BigDecimal(yc.getPlanOutput()),2, RoundingMode.HALF_UP).multiply(new BigDecimal(100)); } pipePlanActual.setCompleteRate(rate); yc.setPipePlanActual(pipePlanActual); }); map1.put("mod",ProListenType.PIPEPLANACTUAL); map1.put("pipePlanActual",yearCountList); mapList.add(map1); return mapList; }else if (dataType.equals(ProListenType.PIPEPLANACTUAL)){ //年度生产计划和实际完成 List yearCountList = bigScreenMapper.queryAllProject(companyId); yearCountList.forEach(yc ->{ PipePlanActual pipePlanActual = bigScreenMapper.pipePlanActual(companyId,yc.getProId()); yc.setPipePlanActual(pipePlanActual); }); map.put("pipePlanActual",yearCountList); }else if (dataType.equals(ProListenType.WATERCULINFO)){ //水养池 List culInfoList = bigScreenMapper.WaterCulInfo(companyId); map.put("waterCulInfo",culInfoList); }else if (dataType.equals(ProListenType.REPOINFO)){ //堆场 List repoInfoList = bigScreenMapper.repoInfo(companyId); map.put("repoInfo",repoInfoList); }else if (dataType.equals(ProListenType.SENDINFO)){ //发运 Map map1 = new HashMap<>(); List sendInfoList = bigScreenMapper.sendInfo(companyId); map1.put("mod",ProListenType.SENDINFO); map1.put("sendInfo",sendInfoList); Map map2 = new HashMap<>(); List repoInfoList = bigScreenMapper.repoInfo(companyId); map2.put("mod",ProListenType.REPOINFO); map2.put("repoInfo",repoInfoList); mapList.add(map1); mapList.add(map2); return mapList; }else if (dataType.equals(ProListenType.STEELUSE)){ //钢筋消耗 List> steelUseList = bigScreenMapper.querySteelMake(); map.put("steelUse",steelUseList); }else if (dataType.equals(ProListenType.RESTINFO)){ //缓存区 List> restInfo = bigScreenMapper.queryRestInfo(); map.put("restInfo",restInfo); } mapList.add(map); return mapList; } }