package com.thhy.staff.modules.biz.platuser.controller;
|
|
import com.alibaba.fastjson.JSON;
|
import com.thhy.general.common.BasicResult;
|
import com.thhy.general.config.GlobalConfig;
|
import com.thhy.general.utils.SpringContextUtils;
|
import com.thhy.staff.modules.biz.platuser.service.PlatUserService;
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
|
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
|
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
|
import org.apache.rocketmq.common.message.MessageExt;
|
import org.redisson.api.RBucket;
|
import org.redisson.api.RedissonClient;
|
import org.slf4j.Logger;
|
import org.slf4j.LoggerFactory;
|
import org.springframework.stereotype.Component;
|
|
import javax.websocket.Session;
|
import java.io.IOException;
|
import java.io.UnsupportedEncodingException;
|
import java.util.ArrayList;
|
import java.util.List;
|
import java.util.Map;
|
import java.util.concurrent.CopyOnWriteArraySet;
|
|
@Component
|
public class ExecFaceListener implements MessageListenerConcurrently {
|
|
private PlatUserService userService;
|
|
public PlatUserService getUserService() {
|
return SpringContextUtils.getBean(PlatUserService.class);
|
}
|
private RedissonClient redissonClient;
|
|
private GlobalConfig globalConfig;
|
|
public GlobalConfig getGlobalConfig() {
|
return SpringContextUtils.getBean(GlobalConfig.class);
|
}
|
|
public PlatUserService getPlatUserService() {
|
return SpringContextUtils.getBean(PlatUserService.class);
|
}
|
|
public RedissonClient getRedissonClient() {
|
return SpringContextUtils.getBean(RedissonClient.class);
|
}
|
|
private Logger logger = LoggerFactory.getLogger(ExecFaceListener.class);
|
|
@Override
|
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
|
logger.info("开始处理人脸消息,个数为:"+list.size());
|
PlatUserService userService = getPlatUserService();
|
RedissonClient redissonClient = getRedissonClient();
|
GlobalConfig globalConfig = getGlobalConfig();
|
try {
|
if(list != null || list.size()>0){
|
MessageExt messageExt = list.get(0);
|
String msgBody = new String(messageExt.getBody(), "utf-8");
|
logger.info("人脸消息整体为:" + msgBody);
|
CopyOnWriteArraySet<Session> SESSIONS = FaceServer.SESSIONS;
|
SESSIONS.forEach(session ->{
|
List<Map<String,Object>> dataMapList = new ArrayList<>();
|
String host = NetUtils.getIp();
|
RBucket<String> rBucketSess = redissonClient.getBucket(globalConfig.getKeyPrefix()+":ws:facetoken:"+host+"-"+globalConfig.getServerPort()+"-"+session.getId());
|
String userToken = rBucketSess.get();
|
Map<String,Object> map = userService.todayWorkUser(userToken);
|
dataMapList.add(map);
|
Map<String,Object> map1 = userService.groupUserWork(userToken);
|
Map<String,Object> map2 = userService.faceRecord(userToken);
|
dataMapList.add(map1);
|
dataMapList.add(map2);
|
try {
|
session.getBasicRemote().sendText(JSON.toJSONString(dataMapList));
|
} catch (IOException e) {
|
throw new RuntimeException(e);
|
}
|
});
|
}
|
} catch (UnsupportedEncodingException e) {
|
logger.info("生产监控消息消费失败"+e.getMessage());
|
//throw new RuntimeException(e);
|
if(list.get(0).getReconsumeTimes()==3){
|
logger.info("尝试重新消费3次,都失败");
|
}else{
|
logger.info("重试");
|
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
|
}
|
}
|
logger.info("生产监控消息消费成功");
|
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
|
}
|
}
|