张晓波
2023-09-19 164694c47c35d6654df69b533e8dbf8b5423efc5
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
package com.thhy.staff.modules.biz.platuser.controller;
 
import com.alibaba.fastjson.JSON;
import com.thhy.general.common.BasicResult;
import com.thhy.general.config.GlobalConfig;
import com.thhy.general.utils.SpringContextUtils;
import com.thhy.staff.modules.biz.platuser.service.PlatUserService;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.redisson.api.RBucket;
import org.redisson.api.RedissonClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
 
import javax.websocket.Session;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArraySet;
 
@Component
public class ExecFaceListener implements MessageListenerConcurrently {
 
    private PlatUserService userService;
 
    public PlatUserService getUserService() {
        return SpringContextUtils.getBean(PlatUserService.class);
    }
    private RedissonClient redissonClient;
 
    private GlobalConfig globalConfig;
 
    public GlobalConfig getGlobalConfig() {
        return SpringContextUtils.getBean(GlobalConfig.class);
    }
 
    public PlatUserService getPlatUserService() {
        return SpringContextUtils.getBean(PlatUserService.class);
    }
 
    public RedissonClient getRedissonClient() {
        return SpringContextUtils.getBean(RedissonClient.class);
    }
 
    private Logger logger = LoggerFactory.getLogger(ExecFaceListener.class);
 
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        logger.info("开始处理人脸消息,个数为:"+list.size());
        PlatUserService userService = getPlatUserService();
        RedissonClient redissonClient = getRedissonClient();
        GlobalConfig globalConfig = getGlobalConfig();
        try {
            if(list != null || list.size()>0){
                MessageExt messageExt = list.get(0);
                String msgBody = new String(messageExt.getBody(), "utf-8");
                logger.info("人脸消息整体为:" + msgBody);
                CopyOnWriteArraySet<Session> SESSIONS = FaceServer.SESSIONS;
                SESSIONS.forEach(session ->{
                    List<Map<String,Object>> dataMapList = new ArrayList<>();
                    String host = NetUtils.getIp();
                    RBucket<String> rBucketSess = redissonClient.getBucket(globalConfig.getKeyPrefix()+":ws:facetoken:"+host+"-"+globalConfig.getServerPort()+"-"+session.getId());
                    String userToken = rBucketSess.get();
                    Map<String,Object> map = userService.todayWorkUser(userToken);
                    dataMapList.add(map);
                    Map<String,Object> map1 = userService.groupUserWork(userToken);
                    Map<String,Object> map2 = userService.faceRecord(userToken);
                    dataMapList.add(map1);
                    dataMapList.add(map2);
                    try {
                        session.getBasicRemote().sendText(JSON.toJSONString(dataMapList));
                    } catch (IOException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        } catch (UnsupportedEncodingException e) {
            logger.info("生产监控消息消费失败"+e.getMessage());
            //throw new RuntimeException(e);
            if(list.get(0).getReconsumeTimes()==3){
                logger.info("尝试重新消费3次,都失败");
            }else{
                logger.info("重试");
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }
        logger.info("生产监控消息消费成功");
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}