package com.thhy.staff.modules.biz.platuser.controller; import com.alibaba.fastjson.JSON; import com.thhy.general.common.BasicResult; import com.thhy.general.config.GlobalConfig; import com.thhy.general.utils.SpringContextUtils; import com.thhy.staff.modules.biz.platuser.service.PlatUserService; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently; import org.apache.rocketmq.common.message.MessageExt; import org.redisson.api.RBucket; import org.redisson.api.RedissonClient; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; import javax.websocket.Session; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.CopyOnWriteArraySet; @Component public class ExecFaceListener implements MessageListenerConcurrently { private PlatUserService userService; public PlatUserService getUserService() { return SpringContextUtils.getBean(PlatUserService.class); } private RedissonClient redissonClient; private GlobalConfig globalConfig; public GlobalConfig getGlobalConfig() { return SpringContextUtils.getBean(GlobalConfig.class); } public PlatUserService getPlatUserService() { return SpringContextUtils.getBean(PlatUserService.class); } public RedissonClient getRedissonClient() { return SpringContextUtils.getBean(RedissonClient.class); } private Logger logger = LoggerFactory.getLogger(ExecFaceListener.class); @Override public ConsumeConcurrentlyStatus consumeMessage(List list, ConsumeConcurrentlyContext consumeConcurrentlyContext) { logger.info("开始处理人脸消息,个数为:"+list.size()); PlatUserService userService = getPlatUserService(); RedissonClient redissonClient = getRedissonClient(); GlobalConfig globalConfig = getGlobalConfig(); try { if(list != null || list.size()>0){ MessageExt messageExt = list.get(0); String msgBody = new String(messageExt.getBody(), "utf-8"); logger.info("人脸消息整体为:" + msgBody); CopyOnWriteArraySet SESSIONS = FaceServer.SESSIONS; SESSIONS.forEach(session ->{ List> dataMapList = new ArrayList<>(); String host = NetUtils.getIp(); RBucket rBucketSess = redissonClient.getBucket(globalConfig.getKeyPrefix()+":ws:facetoken:"+host+"-"+globalConfig.getServerPort()+"-"+session.getId()); String userToken = rBucketSess.get(); Map map = userService.todayWorkUser(userToken); dataMapList.add(map); Map map1 = userService.groupUserWork(userToken); Map map2 = userService.faceRecord(userToken); dataMapList.add(map1); dataMapList.add(map2); try { session.getBasicRemote().sendText(JSON.toJSONString(dataMapList)); } catch (IOException e) { throw new RuntimeException(e); } }); } } catch (UnsupportedEncodingException e) { logger.info("生产监控消息消费失败"+e.getMessage()); //throw new RuntimeException(e); if(list.get(0).getReconsumeTimes()==3){ logger.info("尝试重新消费3次,都失败"); }else{ logger.info("重试"); return ConsumeConcurrentlyStatus.RECONSUME_LATER; } } logger.info("生产监控消息消费成功"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }