ReceiveData.java 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package com.huaxu.rabbitmq;
  2. /**
  3. * @description 监听rabbitmq队列消息,获取设备报警信息
  4. * @auto wangli
  5. * @data 2020-11-18 21:23
  6. */
  7. import com.alibaba.fastjson.JSONObject;
  8. import com.huaxu.client.UserCenterClient;
  9. import com.huaxu.common.CalcUtil;
  10. import com.huaxu.common.StringUtils;
  11. import com.huaxu.dao.AlarmDetailMapper;
  12. import com.huaxu.dto.AlarmDetailsDto;
  13. import com.huaxu.dto.DeviceCheckAlarmDto;
  14. import com.huaxu.entity.AlarmDetailsEntity;
  15. import com.huaxu.entity.Message;
  16. import com.huaxu.entity.MonitorDataEntity;
  17. import com.huaxu.entity.MonitorDataValueEntity;
  18. import com.huaxu.service.MonitorDataService;
  19. import com.huaxu.util.MessageSendUtil;
  20. import lombok.extern.slf4j.Slf4j;
  21. import org.apache.commons.lang.exception.ExceptionUtils;
  22. import org.springframework.amqp.core.AmqpTemplate;
  23. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  24. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  25. import org.springframework.beans.factory.annotation.Autowired;
  26. import org.springframework.beans.factory.annotation.Value;
  27. import org.springframework.context.annotation.Bean;
  28. import org.springframework.stereotype.Component;
  29. import javax.annotation.Resource;
  30. import java.text.ParseException;
  31. import java.text.SimpleDateFormat;
  32. import java.util.*;
  33. import java.util.stream.Collectors;
  34. @Component
  35. @Slf4j
  36. public class ReceiveData {
  37. @Autowired
  38. private AmqpTemplate rabbitTemplate;
  39. @Resource
  40. private AlarmDetailMapper alarmDetailMapper;
  41. @Autowired
  42. private UserCenterClient userCenterClient;
  43. @Autowired
  44. private MessageSendUtil messageSendUtil;
  45. @Value("${spring.rabbitmq.listener.queue}")
  46. private String rabbitmqQueue;
  47. @Autowired
  48. private MonitorDataService monitorDataService;
  49. /**
  50. * 先注入,然后再通过SPEL取值
  51. * @return
  52. */
  53. @Bean
  54. public String rabbitmqQueue(){
  55. return rabbitmqQueue;
  56. }
  57. @RabbitHandler
  58. @RabbitListener(queues = "#{rabbitmqQueue}")
  59. public void received(byte[] receivedData) {
  60. try {
  61. log.info("rabbitMq接收消息:"+new String(receivedData));
  62. receivedDataHandle(receivedData);
  63. } catch (Exception e) {
  64. log.error(ExceptionUtils.getStackTrace(e));
  65. // 发送异常时消息返回队列
  66. rabbitTemplate.convertAndSend(rabbitmqQueue, receivedData);
  67. }
  68. }
  69. public void receivedDataHandle(byte[] receivedData){
  70. //{"agentIdentifier":"balikun_lvyuantest_tcp_agent",
  71. //"eventTime":"2020-11-25 19:05:56","id":781234302422745088,"manufacturer":"lvyuantest","mode":"TCP-JSON",
  72. //"originalData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  73. //"originalDataFormat":"application/json",
  74. //"parsedData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  75. //"type":"水质监测仪","unitIdentifier":"HX_DH-001"}
  76. JSONObject jsonObject = JSONObject.parseObject(new String(receivedData));
  77. //对象有时间、有设备编码、有数据则解析
  78. if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){
  79. SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  80. Date receiveDateTime ;
  81. try {
  82. receiveDateTime = f.parse(jsonObject.getString("eventTime"));
  83. }catch (ParseException e){
  84. log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage());
  85. return;
  86. }
  87. String devcieCode =jsonObject .getString("unitIdentifier");
  88. JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData"));
  89. if(StringUtils.isBlank(devcieCode)){
  90. return ;
  91. }
  92. MonitorDataEntity monitorDataEntity = monitorDataService.getDeviceMonitorInfoByDeviceCode(devcieCode);
  93. //
  94. //查询不到设备或者设备属性为空
  95. if(monitorDataEntity == null
  96. || monitorDataEntity.getDataValues() == null
  97. || monitorDataEntity.getDataValues().size() == 0){
  98. return ;
  99. }
  100. List<MonitorDataValueEntity> monitorDataValueEntities= monitorDataEntity.getDataValues();
  101. for(MonitorDataValueEntity monitorDataValueEntity :monitorDataValueEntities){
  102. monitorDataValueEntity.setDataValue(receiveData.getDouble(monitorDataValueEntity.getIdentifier()));
  103. //单位问题处理
  104. }
  105. monitorDataEntity.setCollectDate(receiveDateTime);
  106. Calendar cal = Calendar.getInstance();
  107. cal.setTime(receiveDateTime);
  108. monitorDataEntity.setYear(cal.get(Calendar.YEAR));
  109. monitorDataEntity.setMonth(cal.get(Calendar.MONTH)+1);
  110. monitorDataEntity.setDay(cal.get(Calendar.DAY_OF_MONTH));
  111. monitorDataEntity.setHour(cal.get(Calendar.HOUR_OF_DAY));
  112. monitorDataService.save(monitorDataEntity);
  113. //修改设备上报时间
  114. alarmDetailMapper.udpateLastUpdateTime(monitorDataEntity.getDeviceId(),receiveDateTime);
  115. //获取需要验证的报警条件
  116. List<DeviceCheckAlarmDto> deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(monitorDataEntity.getDeviceId(),"参数报警");
  117. //有设置报警参数
  118. if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){
  119. Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId();
  120. //系统中已存在的报警信息
  121. List<AlarmDetailsDto> alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警");
  122. //已存在的报警信息转为map方便匹配alarmSettingId
  123. Map<Integer,AlarmDetailsDto> alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1));
  124. List<AlarmDetailsEntity> insert =new ArrayList<>();
  125. //校验各参数异常情况
  126. for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){
  127. //获取参数接收值
  128. Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter());
  129. if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){
  130. //判断报警是否已存在
  131. if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){
  132. AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId());
  133. alarmDetailsDto.setAlarmValue(receivedValue);
  134. alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  135. alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue);
  136. alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue);
  137. // update.add(alarmDetailsDto);
  138. alarmDetailsDto.setDateUpdate( new Date());
  139. alarmDetailMapper.update(alarmDetailsDto);
  140. //已存在的修改后从集合移除
  141. alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId());
  142. }else{
  143. AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto();
  144. alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId());
  145. alarmDetailsEntity.setParentSceneId(deviceCheckAlarmDto.getParentSceneId());
  146. alarmDetailsEntity.setParentSceneName(deviceCheckAlarmDto.getParentSceneName());
  147. alarmDetailsEntity.setSceneId(deviceCheckAlarmDto.getSceneId());
  148. alarmDetailsEntity.setSceneName(deviceCheckAlarmDto.getSceneName());
  149. alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId());
  150. alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId());
  151. alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId());
  152. alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType());
  153. alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId());
  154. alarmDetailsEntity.setAlarmValue(receivedValue);
  155. alarmDetailsEntity.setAlarmStartTime(receiveDateTime);
  156. alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  157. alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId());
  158. alarmDetailsEntity.setState(1);
  159. alarmDetailsEntity.setOpState(1);
  160. alarmDetailsEntity.setMinValue(receivedValue);
  161. alarmDetailsEntity.setMaxValue(receivedValue);
  162. alarmDetailsEntity.setStatus(1);
  163. alarmDetailsEntity.setDateCreate(new Date());
  164. alarmDetailsEntity.setDateUpdate(new Date());
  165. alarmDetailsEntity.setCreateBy("system");
  166. alarmDetailsEntity.setUpdateBy("system");
  167. Message message=new Message();
  168. message.setStatus(1);
  169. message.setCreateBy("system");
  170. message.setTenantId(deviceCheckAlarmDto.getTenantId());
  171. message.setMessageId(UUID.randomUUID().toString());
  172. JSONObject jsonContent = new JSONObject();
  173. //${场景名称}${设备名称}【${报警字段}】异常报警
  174. jsonContent.put("场景名称",deviceCheckAlarmDto.getSceneName());
  175. jsonContent.put("设备名称",deviceCheckAlarmDto.getDeviceName());
  176. jsonContent.put("报警字段",deviceCheckAlarmDto.getAttributeName());
  177. message.setMessageContent(jsonObject.toJSONString()); //消息内容,如果需要动态使用,配合模板使用{key:value}
  178. message.setMessageType(1); //消息类型
  179. message.setMessageTemplateId(1); //模板id
  180. message.setChannel(0); //渠道
  181. Integer companyOrgId = deviceCheckAlarmDto.getCompanyOrgId();
  182. Integer departmentOrgId = deviceCheckAlarmDto.getDeptOrgId();
  183. List<Integer> taskUsers = userCenterClient.findUserIdsByPermissonOrg(deviceCheckAlarmDto.getTenantId(),companyOrgId,departmentOrgId);
  184. if(taskUsers!=null){
  185. taskUsers.forEach(id->{
  186. message.setUserId(id);
  187. messageSendUtil.send(message);
  188. });
  189. }
  190. insert.add(alarmDetailsEntity);
  191. }
  192. }
  193. }
  194. //处理完成后,剩下的标记为历史数据
  195. for(AlarmDetailsDto alarmDetailsDto:alarmDetailsDtoMap.values()){
  196. alarmDetailsDto.setState(0);
  197. alarmDetailsDto.setAlarmEndTime(receiveDateTime);
  198. alarmDetailsDto.setDateUpdate(new Date());
  199. alarmDetailMapper.update(alarmDetailsDto);
  200. }
  201. //批量插入新增报警
  202. if(insert.size()>0){
  203. alarmDetailMapper.batchInsert(insert);
  204. }
  205. }
  206. }
  207. }
  208. }