李旭东
2023-12-08 7efc6ed86025b610cab109a2e9f83362740d8ed4
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package com.thhy.screen.modules.biz.bigscreen.controller;
 
import com.alibaba.fastjson.JSON;
import com.thhy.general.common.BasicResult;
import com.thhy.general.common.enums.ProListenType;
import com.thhy.general.config.GlobalConfig;
import com.thhy.general.utils.SpringContextUtils;
import com.thhy.screen.config.NetUtils;
import com.thhy.screen.modules.biz.bigscreen.entity.*;
import com.thhy.screen.modules.biz.bigscreen.mapper.BigScreenMapper;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.Session;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
 
@Component
public class ExecProListener implements MessageListenerConcurrently {
 
 
    private RedissonClient redissonClient;
 
    public RedissonClient getRedissonClient() {
        return SpringContextUtils.getBean(RedissonClient.class);
    }
 
 
    private GlobalConfig globalConfig;
 
    public GlobalConfig getGlobalConfig() {
        return SpringContextUtils.getBean(GlobalConfig.class);
    }
 
    @Autowired
    private BigScreenMapper bigScreenMapper;
 
    public BigScreenMapper getBigScreenMapper() {
        return SpringContextUtils.getBean(BigScreenMapper.class);
    }
 
    private Logger logger = LoggerFactory.getLogger(ExecProListener.class);
 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        logger.info("开始处理生产监控消息,个数为:"+list.size());
        try {
            if(list != null || list.size()>0){
                MessageExt messageExt = list.get(0);
                String msgBody = new String(messageExt.getBody(), "utf-8");
                logger.info("接收消息整体为:" + msgBody);
                logger.info("生产监控终端处理收到消息"+msgBody);
                CopyOnWriteArraySet<Session> SESSIONS = ProListenSocketServer.SESSIONS;
                SESSIONS.forEach(session ->{
                    try {
                        List<Map<String,Object>> mapList = getData(session,msgBody);
                        if (session.isOpen()) {
                            session.getBasicRemote().sendText(JSON.toJSONString(BasicResult.success(mapList)));
                            logger.info("成功推送消息到生产监控大屏"+session.getId()+"___"+session.getRequestURI().getHost());
                        }
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        } catch (UnsupportedEncodingException e) {
            logger.info("生产监控消息消费失败"+e.getMessage());
            //throw new RuntimeException(e);
            if(list.get(0).getReconsumeTimes()==3){
                logger.info("尝试重新消费3次,都失败");
            }else{
                logger.info("重试");
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        logger.info("生产监控消息消费成功");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
 
    public List<Map<String,Object>> getData(Session session,String dataType){
        redissonClient = getRedissonClient();
        globalConfig = getGlobalConfig();
        bigScreenMapper = getBigScreenMapper();
        String host = NetUtils.getIp();
        RBucket<String> companyRBucket = redissonClient.getBucket(globalConfig.getKeyPrefix()+":ws:company:"+host+"-"+globalConfig.getServerPort()+"-"+session.getId());
        String companyId = companyRBucket.get();
        List<Map<String,Object>> mapList = new ArrayList<>();
        Map<String,Object> map = new HashMap<>();
        map.put("mod",dataType);
        if(dataType.equals(ProListenType.PIPEYEARMONTH)){
            //管片年度和月度
            Map<String,Object> map1 = new HashMap<>();
            List<PipeYearCount> yearCountList = bigScreenMapper.queryAllProject(companyId);
            yearCountList.forEach(yc ->{
                PipePlanActual pipePlanActual = bigScreenMapper.pipePlanActual(companyId,yc.getProId());
                pipePlanActual.setPlanProduct(yc.getPlanOutput());
                BigDecimal rate = new BigDecimal(0);
                if(pipePlanActual.getRingCount()!=0&&yc.getPlanOutput()!=0){
                    rate = new BigDecimal(pipePlanActual.getRingCount()).divide(new BigDecimal(yc.getPlanOutput()),2, RoundingMode.HALF_UP).multiply(new BigDecimal(100));
                }
                pipePlanActual.setCompleteRate(rate);
                yc.setPipePlanActual(pipePlanActual);
            });
            map1.put("mod",ProListenType.PIPEPLANACTUAL);
            map1.put("pipePlanActual",yearCountList);
            mapList.add(map1);
            return mapList;
        }else if (dataType.equals(ProListenType.PIPEPLANACTUAL)){
            //年度生产计划和实际完成
            List<PipeYearCount> yearCountList = bigScreenMapper.queryAllProject(companyId);
            yearCountList.forEach(yc ->{
                PipePlanActual pipePlanActual = bigScreenMapper.pipePlanActual(companyId,yc.getProId());
                yc.setPipePlanActual(pipePlanActual);
            });
            map.put("pipePlanActual",yearCountList);
        }else if (dataType.equals(ProListenType.WATERCULINFO)){
            //水养池
            List<WaterCulInfo> culInfoList = bigScreenMapper.WaterCulInfo(companyId);
            map.put("waterCulInfo",culInfoList);
        }else if (dataType.equals(ProListenType.REPOINFO)){
            //堆场
            List<RepoInfo> repoInfoList = bigScreenMapper.repoInfo(companyId);
            map.put("repoInfo",repoInfoList);
        }else if (dataType.equals(ProListenType.SENDINFO)){
            //发运
            Map<String,Object> map1 = new HashMap<>();
            List<SendInfo> sendInfoList = bigScreenMapper.sendInfo(companyId);
            map1.put("mod",ProListenType.SENDINFO);
            map1.put("sendInfo",sendInfoList);
 
            Map<String,Object> map2 = new HashMap<>();
            List<RepoInfo> repoInfoList = bigScreenMapper.repoInfo(companyId);
            map2.put("mod",ProListenType.REPOINFO);
            map2.put("repoInfo",repoInfoList);
            mapList.add(map1);
            mapList.add(map2);
            return mapList;
        }else if (dataType.equals(ProListenType.STEELUSE)){
            //钢筋消耗
            List<HashMap<String,Object>> steelUseList = bigScreenMapper.querySteelMake();
            map.put("steelUse",steelUseList);
        }else if (dataType.equals(ProListenType.RESTINFO)){
            //缓存区
            List<HashMap<String,Object>> restInfo = bigScreenMapper.queryRestInfo();
            map.put("restInfo",restInfo);
        }
        mapList.add(map);
        return mapList;
    }
}