ReceiveData.java 10 KB


  1. package com.huaxu.rabbitmq;
  2. /**
  3. * @description 监听rabbitmq队列消息,获取设备报警信息
  4. * @auto wangli
  5. * @data 2020-11-18 21:23
  6. */
  7. import com.alibaba.fastjson.JSONObject;
  8. import com.huaxu.common.CalcUtil;
  9. import com.huaxu.common.StringUtils;
  10. import com.huaxu.dao.AlarmDetailMapper;
  11. import com.huaxu.dto.AlarmDetailsDto;
  12. import com.huaxu.dto.DeviceCheckAlarmDto;
  13. import com.huaxu.entity.AlarmDetailsEntity;
  14. import com.huaxu.entity.MonitorDataEntity;
  15. import com.huaxu.entity.MonitorDataValueEntity;
  16. import com.huaxu.service.MonitorDataService;
  17. import lombok.extern.slf4j.Slf4j;
  18. import org.apache.commons.lang.exception.ExceptionUtils;
  19. import org.springframework.amqp.core.AmqpTemplate;
  20. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  21. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  22. import org.springframework.beans.factory.annotation.Autowired;
  23. import org.springframework.beans.factory.annotation.Value;
  24. import org.springframework.context.annotation.Bean;
  25. import org.springframework.stereotype.Component;
  26. import javax.annotation.Resource;
  27. import java.text.ParseException;
  28. import java.text.SimpleDateFormat;
  29. import java.util.*;
  30. import java.util.stream.Collectors;
  31. @Component
  32. @Slf4j
  33. public class ReceiveData {
  34. @Autowired
  35. private AmqpTemplate rabbitTemplate;
  36. @Resource
  37. private AlarmDetailMapper alarmDetailMapper;
  38. @Value("${spring.rabbitmq.listener.queue}")
  39. private String rabbitmqQueue;
  40. @Autowired
  41. private MonitorDataService monitorDataService;
  42. /**
  43. * 先注入,然后再通过SPEL取值
  44. * @return
  45. */
  46. @Bean
  47. public String rabbitmqQueue(){
  48. return rabbitmqQueue;
  49. }
  50. @RabbitHandler
  51. @RabbitListener(queues = "#{rabbitmqQueue}")
  52. public void received(byte[] receivedData) {
  53. try {
  54. log.info("rabbitMq接收消息:"+new String(receivedData));
  55. receivedDataHandle(receivedData);
  56. } catch (Exception e) {
  57. log.error(ExceptionUtils.getStackTrace(e));
  58. // 发送异常时消息返回队列
  59. rabbitTemplate.convertAndSend(rabbitmqQueue, receivedData);
  60. }
  61. }
  62. public void receivedDataHandle(byte[] receivedData){
  63. //{"agentIdentifier":"balikun_lvyuantest_tcp_agent",
  64. //"eventTime":"2020-11-25 19:05:56","id":781234302422745088,"manufacturer":"lvyuantest","mode":"TCP-JSON",
  65. //"originalData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  66. //"originalDataFormat":"application/json",
  67. //"parsedData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  68. //"type":"水质监测仪","unitIdentifier":"HX_DH-001"}
  69. JSONObject jsonObject = JSONObject.parseObject(new String(receivedData));
  70. //对象有时间、有设备编码、有数据则解析
  71. if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){
  72. SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  73. Date receiveDateTime ;
  74. try {
  75. receiveDateTime = f.parse(jsonObject.getString("eventTime"));
  76. }catch (ParseException e){
  77. log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage());
  78. return;
  79. }
  80. String devcieCode =jsonObject .getString("unitIdentifier");
  81. JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData"));
  82. //保存报警数据
  83. if(StringUtils.isBlank(devcieCode)){
  84. return ;
  85. }
  86. List<MonitorDataEntity> list = new ArrayList<>();
  87. //暂时测试共用一个设备数据
  88. MonitorDataEntity monitorDataEntity = monitorDataService.getDeviceMonitorInfoByDeviceCode(devcieCode);
  89. //
  90. //查询不到设备或者设备属性为空
  91. if(monitorDataEntity == null
  92. || monitorDataEntity.getDataValues() == null
  93. || monitorDataEntity.getDataValues().size() == 0){
  94. return ;
  95. }
  96. List<MonitorDataValueEntity> monitorDataValueEntities= monitorDataEntity.getDataValues();
  97. for(MonitorDataValueEntity monitorDataValueEntity :monitorDataValueEntities){
  98. monitorDataValueEntity.setDataValue(receiveData.getDouble(monitorDataValueEntity.getIdentifier()));
  99. //单位问题处理
  100. }
  101. monitorDataEntity.setCollectDate(receiveDateTime);
  102. Calendar cal = Calendar.getInstance();
  103. cal.setTime(receiveDateTime);
  104. monitorDataEntity.setYear(cal.get(Calendar.YEAR));
  105. monitorDataEntity.setMonth(cal.get(Calendar.MONTH)+1);
  106. monitorDataEntity.setDay(cal.get(Calendar.DAY_OF_MONTH));
  107. monitorDataEntity.setHour(cal.get(Calendar.HOUR_OF_DAY));
  108. monitorDataService.save(monitorDataEntity);
  109. //修改设备上报时间
  110. alarmDetailMapper.udpateLastUpdateTime(monitorDataEntity.getDeviceId(),receiveDateTime);
  111. //获取需要验证的报警条件
  112. List<DeviceCheckAlarmDto> deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(monitorDataEntity.getDeviceId(),"参数报警");
  113. //有设置报警参数
  114. if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){
  115. Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId();
  116. //系统中已存在的报警信息
  117. List<AlarmDetailsDto> alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警");
  118. //已存在的报警信息转为map方便匹配alarmSettingId
  119. Map<Integer,AlarmDetailsDto> alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1));
  120. List<AlarmDetailsEntity> insert =new ArrayList<>();
  121. List<AlarmDetailsDto> update =new ArrayList<>();
  122. List<AlarmDetailsDto> delete =new ArrayList<>();
  123. //校验各参数异常情况
  124. for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){
  125. //获取参数接收值
  126. Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter());
  127. if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){
  128. //判断报警是否已存在
  129. if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){
  130. AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId());
  131. alarmDetailsDto.setAlarmValue(receivedValue);
  132. alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  133. alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue);
  134. alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue);
  135. // update.add(alarmDetailsDto);
  136. alarmDetailsDto.setDateUpdate( new Date());
  137. alarmDetailMapper.update(alarmDetailsDto);
  138. //已存在的修改后从集合移除
  139. alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId());
  140. }else{
  141. AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto();
  142. alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId());
  143. alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId());
  144. alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId());
  145. alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId());
  146. alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType());
  147. alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId());
  148. alarmDetailsEntity.setAlarmValue(receivedValue);
  149. alarmDetailsEntity.setAlarmStartTime(receiveDateTime);
  150. alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  151. alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId());
  152. alarmDetailsEntity.setState(1);
  153. alarmDetailsEntity.setOpState(1);
  154. alarmDetailsEntity.setMinValue(receivedValue);
  155. alarmDetailsEntity.setMaxValue(receivedValue);
  156. alarmDetailsEntity.setStatus(1);
  157. alarmDetailsEntity.setDateCreate(new Date());
  158. alarmDetailsEntity.setDateUpdate(new Date());
  159. alarmDetailsEntity.setCreateBy("system");
  160. alarmDetailsEntity.setUpdateBy("system");
  161. insert.add(alarmDetailsEntity);
  162. }
  163. }
  164. }
  165. //处理完成后,剩下的标记为历史数据
  166. for(AlarmDetailsDto alarmDetailsDto:alarmDetailsDtoMap.values()){
  167. alarmDetailsDto.setState(0);
  168. alarmDetailsDto.setAlarmEndTime(receiveDateTime);
  169. alarmDetailsDto.setDateUpdate(new Date());
  170. alarmDetailMapper.update(alarmDetailsDto);
  171. }
  172. //批量插入新增报警
  173. if(insert.size()>0){
  174. alarmDetailMapper.batchInsert(insert);
  175. }
  176. }
  177. }
  178. }
  179. }