ReceiveData.java 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. package com.huaxu.rabbitmq;
  2. /**
  3. * @description 监听rabbitmq队列消息,获取设备报警信息
  4. * @auto wangli
  5. * @data 2020-11-18 21:23
  6. */
  7. import com.alibaba.fastjson.JSONObject;
  8. import com.huaxu.client.OperationManagerClient;
  9. import com.huaxu.client.UserCenterClient;
  10. import com.huaxu.common.CalcUtil;
  11. import com.huaxu.common.StringUtils;
  12. import com.huaxu.dao.AlarmDetailMapper;
  13. import com.huaxu.dto.AlarmDetailsDto;
  14. import com.huaxu.dto.DeviceCheckAlarmDto;
  15. import com.huaxu.dto.WorkOrderManageByAlarmDto;
  16. import com.huaxu.entity.AlarmDetailsEntity;
  17. import com.huaxu.entity.Message;
  18. import com.huaxu.entity.MonitorDataEntity;
  19. import com.huaxu.entity.MonitorDataValueEntity;
  20. import com.huaxu.service.MonitorDataService;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.apache.commons.lang.exception.ExceptionUtils;
  23. import org.springframework.amqp.core.AmqpTemplate;
  24. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  25. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  26. import org.springframework.beans.factory.annotation.Autowired;
  27. import org.springframework.beans.factory.annotation.Value;
  28. import org.springframework.context.annotation.Bean;
  29. import org.springframework.stereotype.Component;
  30. import javax.annotation.Resource;
  31. import java.awt.*;
  32. import java.text.ParseException;
  33. import java.text.SimpleDateFormat;
  34. import java.util.*;
  35. import java.util.List;
  36. import java.util.stream.Collectors;
  37. @Component
  38. @Slf4j
  39. public class ReceiveData {
  40. @Resource(name = "messageRabbitTemplate")
  41. private AmqpTemplate messageRabbitTemplate;
  42. @Resource(name = "receiveDataRabbitTemplate")
  43. private AmqpTemplate receiveDataRabbitTemplate;
  44. @Resource
  45. private AlarmDetailMapper alarmDetailMapper;
  46. @Autowired
  47. private UserCenterClient userCenterClient;
  48. @Value("${receiveData.spring.rabbitmq.listener.queue}")
  49. private String rabbitmqQueue;
  50. @Autowired
  51. private MonitorDataService monitorDataService;
  52. @Autowired
  53. private OperationManagerClient operationManagerClient;
  54. /**
  55. * 先注入,然后再通过SPEL取值
  56. * @return
  57. */
  58. @Bean
  59. public String rabbitmqQueue(){
  60. return rabbitmqQueue;
  61. }
  62. @Value("${receive.exchange.name}")
  63. private String receiveExchangeName;
  64. @Value("${dispath.routing.key}")
  65. private String dispathRoutingKey;
  66. private void send(Message message){
  67. log.debug("消息发送 exchange={} routingkey={} 用户id={}",receiveExchangeName,dispathRoutingKey,message.getUserId());
  68. messageRabbitTemplate.convertAndSend(receiveExchangeName,dispathRoutingKey, JSONObject.toJSONString(message));
  69. }
  70. @RabbitHandler
  71. @RabbitListener(queues = "#{rabbitmqQueue}",containerFactory = "receiveDataContainerFactory")
  72. public void received(byte[] receivedData) {
  73. try {
  74. log.debug("rabbitMq接收消息:"+new String(receivedData));
  75. receivedDataHandle(receivedData);
  76. } catch (Exception e) {
  77. log.error(ExceptionUtils.getStackTrace(e));
  78. // 发送异常时消息返回队列
  79. // receiveDataRabbitTemplate.convertAndSend(rabbitmqQueue, receivedData);
  80. }
  81. }
  82. public void receivedDataHandle(byte[] receivedData){
  83. //{"agentIdentifier":"balikun_lvyuantest_tcp_agent",
  84. //"eventTime":"2020-11-25 19:05:56","id":781234302422745088,"manufacturer":"lvyuantest","mode":"TCP-JSON",
  85. //"originalData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  86. //"originalDataFormat":"application/json",
  87. //"parsedData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  88. //"type":"水质监测仪","unitIdentifier":"HX_DH-001"}
  89. JSONObject jsonObject = JSONObject.parseObject(new String(receivedData));
  90. //对象有时间、有设备编码、有数据则解析
  91. if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){
  92. SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  93. Date receiveDateTime ;
  94. try {
  95. receiveDateTime = f.parse(jsonObject.getString("eventTime"));
  96. }catch (ParseException e){
  97. log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage());
  98. return;
  99. }
  100. String devcieCode =jsonObject .getString("unitIdentifier");
  101. JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData"));
  102. if(StringUtils.isBlank(devcieCode)){
  103. return ;
  104. }
  105. MonitorDataEntity monitorDataEntity = monitorDataService.getDeviceMonitorInfoByDeviceCode(devcieCode);
  106. //
  107. //查询不到设备或者设备属性为空
  108. if(monitorDataEntity == null
  109. || monitorDataEntity.getDataValues() == null
  110. || monitorDataEntity.getDataValues().size() == 0){
  111. return ;
  112. }
  113. List<MonitorDataValueEntity> monitorDataValueEntities= monitorDataEntity.getDataValues();
  114. Integer number = 0;
  115. for(MonitorDataValueEntity monitorDataValueEntity :monitorDataValueEntities){
  116. if(receiveData.containsKey(monitorDataValueEntity.getIdentifier())){
  117. number++;
  118. monitorDataValueEntity.setDataValue(receiveData.getDouble(monitorDataValueEntity.getIdentifier()));
  119. //单位问题处理
  120. }
  121. }
  122. //没有匹配到属性,视为垃圾数据忽略
  123. if(number == 0){
  124. return ;
  125. }
  126. monitorDataEntity.setCollectDate(receiveDateTime);
  127. Calendar cal = Calendar.getInstance();
  128. cal.setTime(receiveDateTime);
  129. monitorDataEntity.setYear(cal.get(Calendar.YEAR));
  130. monitorDataEntity.setMonth(cal.get(Calendar.MONTH)+1);
  131. monitorDataEntity.setDay(cal.get(Calendar.DAY_OF_MONTH));
  132. monitorDataEntity.setHour(cal.get(Calendar.HOUR_OF_DAY));
  133. monitorDataService.save(monitorDataEntity);
  134. //修改设备上报时间
  135. alarmDetailMapper.udpateLastUpdateTime(monitorDataEntity.getDeviceId(),receiveDateTime);
  136. //获取需要验证的报警条件
  137. List<DeviceCheckAlarmDto> deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(monitorDataEntity.getDeviceId(),"参数报警");
  138. //有设置报警参数
  139. if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){
  140. Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId();
  141. //系统中已存在的报警信息
  142. List<AlarmDetailsDto> alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警",null);
  143. //已存在的报警信息转为map方便匹配alarmSettingId
  144. Map<Integer,AlarmDetailsDto> alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1));
  145. List<AlarmDetailsEntity> insert =new ArrayList<>();
  146. List<AlarmDetailsEntity> update =new ArrayList<>();
  147. //校验各参数异常情况
  148. for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){
  149. //获取参数接收值
  150. Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter());
  151. if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){
  152. //判断报警是否已存在
  153. if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){
  154. AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId());
  155. alarmDetailsDto.setAlarmValue(receivedValue);
  156. alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  157. alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue);
  158. alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue);
  159. // update.add(alarmDetailsDto);
  160. alarmDetailsDto.setDateUpdate( new Date());
  161. alarmDetailMapper.update(alarmDetailsDto);
  162. update.add(alarmDetailsDto);
  163. //已存在的修改后从集合移除
  164. alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId());
  165. }else{
  166. AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto();
  167. alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId());
  168. alarmDetailsEntity.setParentSceneId(deviceCheckAlarmDto.getParentSceneId());
  169. alarmDetailsEntity.setParentSceneName(deviceCheckAlarmDto.getParentSceneName());
  170. alarmDetailsEntity.setSceneId(deviceCheckAlarmDto.getSceneId());
  171. alarmDetailsEntity.setSceneName(deviceCheckAlarmDto.getSceneName());
  172. alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId());
  173. alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId());
  174. alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId());
  175. alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType());
  176. alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId());
  177. alarmDetailsEntity.setAlarmValue(receivedValue);
  178. alarmDetailsEntity.setAlarmStartTime(receiveDateTime);
  179. alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  180. alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId());
  181. alarmDetailsEntity.setState(1);
  182. alarmDetailsEntity.setOpState(1);
  183. alarmDetailsEntity.setMinValue(receivedValue);
  184. alarmDetailsEntity.setMaxValue(receivedValue);
  185. alarmDetailsEntity.setStatus(1);
  186. alarmDetailsEntity.setDateCreate(new Date());
  187. alarmDetailsEntity.setDateUpdate(new Date());
  188. alarmDetailsEntity.setCreateBy("system");
  189. alarmDetailsEntity.setUpdateBy("system");
  190. Message message=new Message();
  191. message.setStatus(1);
  192. message.setTenantId(deviceCheckAlarmDto.getTenantId());
  193. message.setMessageId(UUID.randomUUID().toString());
  194. JSONObject jsonContent = new JSONObject();
  195. //${场景名称}${设备名称}【${报警字段}】异常报警
  196. jsonContent.put("场景名称",deviceCheckAlarmDto.getSceneName());
  197. jsonContent.put("设备名称",deviceCheckAlarmDto.getDeviceName());
  198. jsonContent.put("报警字段",deviceCheckAlarmDto.getAttributeName());
  199. message.setMessageContent(jsonContent.toJSONString()); //消息内容,如果需要动态使用,配合模板使用{key:value}
  200. message.setMessageType(1); //消息类型
  201. message.setMessageTemplateId(1); //模板id
  202. message.setChannel(0); //渠道
  203. Integer companyOrgId = deviceCheckAlarmDto.getCompanyOrgId();
  204. Integer departmentOrgId = deviceCheckAlarmDto.getDeptOrgId();
  205. try{
  206. List<Integer> taskUsers = userCenterClient.findUserIdsByPermissonOrg(deviceCheckAlarmDto.getTenantId(),companyOrgId,departmentOrgId);
  207. if(taskUsers!=null){
  208. taskUsers.forEach(id->{
  209. message.setUserId(id);
  210. this.send(message);
  211. });
  212. }
  213. }catch(Exception e){
  214. e.printStackTrace();
  215. log.info("推送报警消息失败:{}",e.getMessage());
  216. }
  217. insert.add(alarmDetailsEntity);
  218. }
  219. }
  220. }
  221. //批量插入新增报警
  222. if(insert.size()>0){
  223. alarmDetailMapper.batchInsert(insert);
  224. }
  225. if(alarmDetailsDtoMap.values().size() > 0){
  226. //处理完成后,剩下的标记为历史数据
  227. List<Long> deviceIds = new ArrayList<>();
  228. deviceIds.add(monitorDataEntity.getDeviceId().longValue());
  229. List<WorkOrderManageByAlarmDto> workOrders = operationManagerClient.findWorkOrderByDeviceIds(deviceIds);
  230. if(workOrders.size()>0){
  231. WorkOrderManageByAlarmDto orderManageByAlarmDto = workOrders.get(0);
  232. String taskDesc = orderManageByAlarmDto.getTaskDesc();
  233. //标记为历史数据 并 修改工单描述
  234. for(AlarmDetailsDto alarmDetailsDto : alarmDetailsDtoMap.values()){
  235. alarmDetailsDto.setState(0);
  236. alarmDetailsDto.setAlarmEndTime(receiveDateTime);
  237. alarmDetailsDto.setDateUpdate(new Date());
  238. alarmDetailMapper.update(alarmDetailsDto);
  239. taskDesc = taskDesc.replace(";"+alarmDetailsDto.getAlarmContent(),"")
  240. .replace(alarmDetailsDto.getAlarmContent(),"");
  241. }
  242. orderManageByAlarmDto.setTaskDesc(taskDesc);
  243. if(orderManageByAlarmDto != null
  244. && (orderManageByAlarmDto.getOrderStatus() == null || orderManageByAlarmDto.getOrderStatus() == 0)
  245. && StringUtils.isBlank(taskDesc)){
  246. //工单未派单且无报警信息,删除工单
  247. operationManagerClient.batchDeleteByAlarms(new ArrayList<>(deviceIds));
  248. }else{
  249. //其他情况修改描述
  250. operationManagerClient.updateByAlarms(workOrders);
  251. }
  252. }
  253. }
  254. //报警恢复的删除工单
  255. // Map<Integer,AlarmDetailsEntity> AlarmDtoOldsMap = update.stream().collect(Collectors.toMap(AlarmDetailsEntity::getDeviceId, a -> a,(k1, k2)->k1));
  256. // Set<Long> deviceIds = alarmDetailsDtoMap.values().stream()
  257. // .filter(a -> !AlarmDtoOldsMap.containsKey(a.getDeviceId()))
  258. // .map(a -> a.getDeviceId().longValue())
  259. // .collect(Collectors.toSet());
  260. // if(deviceIds.size()>0){
  261. // operationManagerClient.batchDeleteByAlarms(new ArrayList<>(deviceIds));
  262. // }
  263. }
  264. }
  265. }
  266. }