ReceiveData.java 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267
  1. package com.huaxu.rabbitmq;
  2. /**
  3. * @description 监听rabbitmq队列消息,获取设备报警信息
  4. * @auto wangli
  5. * @data 2020-11-18 21:23
  6. */
  7. import com.alibaba.fastjson.JSONObject;
  8. import com.huaxu.client.OperationManagerClient;
  9. import com.huaxu.client.UserCenterClient;
  10. import com.huaxu.common.CalcUtil;
  11. import com.huaxu.common.StringUtils;
  12. import com.huaxu.dao.AlarmDetailMapper;
  13. import com.huaxu.dto.AlarmDetailsDto;
  14. import com.huaxu.dto.DeviceCheckAlarmDto;
  15. import com.huaxu.entity.AlarmDetailsEntity;
  16. import com.huaxu.entity.Message;
  17. import com.huaxu.entity.MonitorDataEntity;
  18. import com.huaxu.entity.MonitorDataValueEntity;
  19. import com.huaxu.service.MonitorDataService;
  20. import com.huaxu.util.MessageSendUtil;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.apache.commons.lang.exception.ExceptionUtils;
  23. import org.springframework.amqp.core.AmqpTemplate;
  24. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  25. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  26. import org.springframework.beans.factory.annotation.Autowired;
  27. import org.springframework.beans.factory.annotation.Value;
  28. import org.springframework.context.annotation.Bean;
  29. import org.springframework.stereotype.Component;
  30. import javax.annotation.Resource;
  31. import java.text.ParseException;
  32. import java.text.SimpleDateFormat;
  33. import java.util.*;
  34. import java.util.stream.Collectors;
  35. @Component
  36. @Slf4j
  37. public class ReceiveData {
  38. @Autowired
  39. private AmqpTemplate rabbitTemplate;
  40. @Resource
  41. private AlarmDetailMapper alarmDetailMapper;
  42. @Autowired
  43. private UserCenterClient userCenterClient;
  44. @Autowired
  45. private MessageSendUtil messageSendUtil;
  46. @Value("${spring.rabbitmq.listener.queue}")
  47. private String rabbitmqQueue;
  48. @Autowired
  49. private MonitorDataService monitorDataService;
  50. @Autowired
  51. private OperationManagerClient operationManagerClient;
  52. /**
  53. * 先注入,然后再通过SPEL取值
  54. * @return
  55. */
  56. @Bean
  57. public String rabbitmqQueue(){
  58. return rabbitmqQueue;
  59. }
  60. @RabbitHandler
  61. @RabbitListener(queues = "#{rabbitmqQueue}")
  62. public void received(byte[] receivedData) {
  63. try {
  64. log.info("rabbitMq接收消息:"+new String(receivedData));
  65. receivedDataHandle(receivedData);
  66. } catch (Exception e) {
  67. log.error(ExceptionUtils.getStackTrace(e));
  68. // 发送异常时消息返回队列
  69. rabbitTemplate.convertAndSend(rabbitmqQueue, receivedData);
  70. }
  71. }
  72. public void receivedDataHandle(byte[] receivedData){
  73. //{"agentIdentifier":"balikun_lvyuantest_tcp_agent",
  74. //"eventTime":"2020-11-25 19:05:56","id":781234302422745088,"manufacturer":"lvyuantest","mode":"TCP-JSON",
  75. //"originalData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  76. //"originalDataFormat":"application/json",
  77. //"parsedData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  78. //"type":"水质监测仪","unitIdentifier":"HX_DH-001"}
  79. JSONObject jsonObject = JSONObject.parseObject(new String(receivedData));
  80. //对象有时间、有设备编码、有数据则解析
  81. if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){
  82. SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  83. Date receiveDateTime ;
  84. try {
  85. receiveDateTime = f.parse(jsonObject.getString("eventTime"));
  86. }catch (ParseException e){
  87. log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage());
  88. return;
  89. }
  90. String devcieCode =jsonObject .getString("unitIdentifier");
  91. JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData"));
  92. if(StringUtils.isBlank(devcieCode)){
  93. return ;
  94. }
  95. MonitorDataEntity monitorDataEntity = monitorDataService.getDeviceMonitorInfoByDeviceCode(devcieCode);
  96. //
  97. //查询不到设备或者设备属性为空
  98. if(monitorDataEntity == null
  99. || monitorDataEntity.getDataValues() == null
  100. || monitorDataEntity.getDataValues().size() == 0){
  101. return ;
  102. }
  103. List<MonitorDataValueEntity> monitorDataValueEntities= monitorDataEntity.getDataValues();
  104. Integer number = 0;
  105. for(MonitorDataValueEntity monitorDataValueEntity :monitorDataValueEntities){
  106. if(receiveData.containsKey(monitorDataValueEntity.getIdentifier())){
  107. number++;
  108. monitorDataValueEntity.setDataValue(receiveData.getDouble(monitorDataValueEntity.getIdentifier()));
  109. //单位问题处理
  110. }
  111. }
  112. //没有匹配到属性,视为垃圾数据忽略
  113. if(number == 0){
  114. return ;
  115. }
  116. monitorDataEntity.setCollectDate(receiveDateTime);
  117. Calendar cal = Calendar.getInstance();
  118. cal.setTime(receiveDateTime);
  119. monitorDataEntity.setYear(cal.get(Calendar.YEAR));
  120. monitorDataEntity.setMonth(cal.get(Calendar.MONTH)+1);
  121. monitorDataEntity.setDay(cal.get(Calendar.DAY_OF_MONTH));
  122. monitorDataEntity.setHour(cal.get(Calendar.HOUR_OF_DAY));
  123. monitorDataService.save(monitorDataEntity);
  124. //修改设备上报时间
  125. alarmDetailMapper.udpateLastUpdateTime(monitorDataEntity.getDeviceId(),receiveDateTime);
  126. //获取需要验证的报警条件
  127. List<DeviceCheckAlarmDto> deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(monitorDataEntity.getDeviceId(),"参数报警");
  128. //有设置报警参数
  129. if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){
  130. Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId();
  131. //系统中已存在的报警信息
  132. List<AlarmDetailsDto> alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警");
  133. //已存在的报警信息转为map方便匹配alarmSettingId
  134. Map<Integer,AlarmDetailsDto> alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1));
  135. List<AlarmDetailsEntity> insert =new ArrayList<>();
  136. List<AlarmDetailsEntity> update =new ArrayList<>();
  137. //校验各参数异常情况
  138. for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){
  139. //获取参数接收值
  140. Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter());
  141. if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){
  142. //判断报警是否已存在
  143. if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){
  144. AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId());
  145. alarmDetailsDto.setAlarmValue(receivedValue);
  146. alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  147. alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue);
  148. alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue);
  149. // update.add(alarmDetailsDto);
  150. alarmDetailsDto.setDateUpdate( new Date());
  151. alarmDetailMapper.update(alarmDetailsDto);
  152. update.add(alarmDetailsDto);
  153. //已存在的修改后从集合移除
  154. alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId());
  155. }else{
  156. AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto();
  157. alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId());
  158. alarmDetailsEntity.setParentSceneId(deviceCheckAlarmDto.getParentSceneId());
  159. alarmDetailsEntity.setParentSceneName(deviceCheckAlarmDto.getParentSceneName());
  160. alarmDetailsEntity.setSceneId(deviceCheckAlarmDto.getSceneId());
  161. alarmDetailsEntity.setSceneName(deviceCheckAlarmDto.getSceneName());
  162. alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId());
  163. alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId());
  164. alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId());
  165. alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType());
  166. alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId());
  167. alarmDetailsEntity.setAlarmValue(receivedValue);
  168. alarmDetailsEntity.setAlarmStartTime(receiveDateTime);
  169. alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  170. alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId());
  171. alarmDetailsEntity.setState(1);
  172. alarmDetailsEntity.setOpState(1);
  173. alarmDetailsEntity.setMinValue(receivedValue);
  174. alarmDetailsEntity.setMaxValue(receivedValue);
  175. alarmDetailsEntity.setStatus(1);
  176. alarmDetailsEntity.setDateCreate(new Date());
  177. alarmDetailsEntity.setDateUpdate(new Date());
  178. alarmDetailsEntity.setCreateBy("system");
  179. alarmDetailsEntity.setUpdateBy("system");
  180. Message message=new Message();
  181. message.setStatus(1);
  182. message.setTenantId(deviceCheckAlarmDto.getTenantId());
  183. message.setMessageId(UUID.randomUUID().toString());
  184. JSONObject jsonContent = new JSONObject();
  185. //${场景名称}${设备名称}【${报警字段}】异常报警
  186. jsonContent.put("场景名称",deviceCheckAlarmDto.getSceneName());
  187. jsonContent.put("设备名称",deviceCheckAlarmDto.getDeviceName());
  188. jsonContent.put("报警字段",deviceCheckAlarmDto.getAttributeName());
  189. message.setMessageContent(jsonContent.toJSONString()); //消息内容,如果需要动态使用,配合模板使用{key:value}
  190. message.setMessageType(1); //消息类型
  191. message.setMessageTemplateId(1); //模板id
  192. message.setChannel(0); //渠道
  193. Integer companyOrgId = deviceCheckAlarmDto.getCompanyOrgId();
  194. Integer departmentOrgId = deviceCheckAlarmDto.getDeptOrgId();
  195. try{
  196. List<Integer> taskUsers = userCenterClient.findUserIdsByPermissonOrg(deviceCheckAlarmDto.getTenantId(),companyOrgId,departmentOrgId);
  197. if(taskUsers!=null){
  198. taskUsers.forEach(id->{
  199. message.setUserId(id);
  200. messageSendUtil.send(message);
  201. });
  202. }
  203. }catch(Exception e){
  204. e.printStackTrace();
  205. log.info("推送报警消息失败:{}",e.getMessage());
  206. }
  207. insert.add(alarmDetailsEntity);
  208. }
  209. }
  210. }
  211. //处理完成后,剩下的标记为历史数据
  212. for(AlarmDetailsDto alarmDetailsDto:alarmDetailsDtoMap.values()){
  213. alarmDetailsDto.setState(0);
  214. alarmDetailsDto.setAlarmEndTime(receiveDateTime);
  215. alarmDetailsDto.setDateUpdate(new Date());
  216. alarmDetailMapper.update(alarmDetailsDto);
  217. }
  218. //批量插入新增报警
  219. if(insert.size()>0){
  220. alarmDetailMapper.batchInsert(insert);
  221. }
  222. //报警恢复的删除工单
  223. Map<Integer,AlarmDetailsEntity> AlarmDtoNewsMap = insert.stream().collect(Collectors.toMap(AlarmDetailsEntity::getDeviceId, a -> a,(k1, k2)->k1));
  224. Map<Integer,AlarmDetailsEntity> AlarmDtoOldsMap = update.stream().collect(Collectors.toMap(AlarmDetailsEntity::getDeviceId, a -> a,(k1, k2)->k1));
  225. Set<Long> deviceIds = alarmDetailsDtoMap.values().stream()
  226. .filter(a -> !(AlarmDtoNewsMap.containsKey(a.getDeviceId())||AlarmDtoOldsMap.containsKey(a.getDeviceId())))
  227. .map(a -> a.getDeviceId().longValue())
  228. .collect(Collectors.toSet());
  229. if(deviceIds.size()>0){
  230. operationManagerClient.batchDeleteByAlarms(new ArrayList<>(deviceIds));
  231. }
  232. }
  233. }
  234. }
  235. }