ReceiveData.java 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. package com.huaxu.rabbitmq;
  2. /**
  3. * @description 监听rabbitmq队列消息,获取设备报警信息
  4. * @auto wangli
  5. * @data 2020-11-18 21:23
  6. */
  7. import com.alibaba.fastjson.JSONObject;
  8. import com.huaxu.common.CalcUtil;
  9. import com.huaxu.common.StringUtils;
  10. import com.huaxu.dao.AlarmDetailMapper;
  11. import com.huaxu.dto.AlarmDetailsDto;
  12. import com.huaxu.dto.DeviceCheckAlarmDto;
  13. import com.huaxu.entity.AlarmDetailsEntity;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.apache.commons.lang.exception.ExceptionUtils;
  16. import org.springframework.amqp.core.AmqpTemplate;
  17. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  18. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  19. import org.springframework.beans.factory.annotation.Autowired;
  20. import org.springframework.beans.factory.annotation.Value;
  21. import org.springframework.context.annotation.Bean;
  22. import org.springframework.stereotype.Component;
  23. import javax.annotation.Resource;
  24. import java.text.ParseException;
  25. import java.text.SimpleDateFormat;
  26. import java.util.ArrayList;
  27. import java.util.Date;
  28. import java.util.List;
  29. import java.util.Map;
  30. import java.util.stream.Collectors;
  31. @Component
  32. @Slf4j
  33. public class ReceiveData {
  34. @Autowired
  35. private AmqpTemplate rabbitTemplate;
  36. @Resource
  37. private AlarmDetailMapper alarmDetailMapper;
  38. @Value("${spring.rabbitmq.listener.queue}")
  39. private String rabbitmqQueue;
  40. /**
  41. * 先注入,然后再通过SPEL取值
  42. * @return
  43. */
  44. @Bean
  45. public String rabbitmqQueue(){
  46. return rabbitmqQueue;
  47. }
  48. @RabbitHandler
  49. @RabbitListener(queues = "#{rabbitmqQueue}")
  50. public void received(byte[] receivedData) {
  51. try {
  52. log.info("rabbitMq接收消息:"+new String(receivedData));
  53. receivedDataHandle(receivedData);
  54. } catch (Exception e) {
  55. log.error(ExceptionUtils.getStackTrace(e));
  56. // 发送异常时消息返回队列
  57. rabbitTemplate.convertAndSend(rabbitmqQueue, receivedData);
  58. }
  59. }
  60. public void receivedDataHandle(byte[] receivedData){
  61. JSONObject jsonObject = JSONObject.parseObject(new String(receivedData));
  62. //对象有时间、有设备编码、有数据则解析
  63. if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){
  64. SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  65. Date receiveDateTime ;
  66. try {
  67. receiveDateTime = f.parse(jsonObject.getString("eventTime"));
  68. }catch (ParseException e){
  69. log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage());
  70. return;
  71. }
  72. String devcieCode =jsonObject .getString("unitIdentifier");
  73. JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData"));
  74. //修改设备上报时间
  75. alarmDetailMapper.udpateLastUpdateTime(devcieCode,receiveDateTime);
  76. if(StringUtils.isNotBlank(devcieCode)){
  77. //获取需要验证的报警条件
  78. List<DeviceCheckAlarmDto> deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(devcieCode,"参数报警");
  79. //有设置报警参数
  80. if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){
  81. Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId();
  82. //系统中已存在的报警信息
  83. List<AlarmDetailsDto> alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警");
  84. //已存在的报警信息转为map方便匹配alarmSettingId
  85. Map<Integer,AlarmDetailsDto> alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1));
  86. List<AlarmDetailsEntity> insert =new ArrayList<>();
  87. List<AlarmDetailsDto> update =new ArrayList<>();
  88. List<AlarmDetailsDto> delete =new ArrayList<>();
  89. //校验各参数异常情况
  90. for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){
  91. //获取参数接收值
  92. Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter());
  93. if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){
  94. //判断报警是否已存在
  95. if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){
  96. AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId());
  97. alarmDetailsDto.setAlarmValue(receivedValue);
  98. alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  99. alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue);
  100. alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue);
  101. // update.add(alarmDetailsDto);
  102. alarmDetailsDto.setDateUpdate( new Date());
  103. alarmDetailMapper.update(alarmDetailsDto);
  104. //已存在的修改后从集合移除
  105. alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId());
  106. }else{
  107. AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto();
  108. alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId());
  109. alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId());
  110. alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId());
  111. alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId());
  112. alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType());
  113. alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId());
  114. alarmDetailsEntity.setAlarmValue(receivedValue);
  115. alarmDetailsEntity.setAlarmStartTime(receiveDateTime);
  116. alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  117. alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId());
  118. alarmDetailsEntity.setState(1);
  119. alarmDetailsEntity.setOpState(1);
  120. alarmDetailsEntity.setMinValue(receivedValue);
  121. alarmDetailsEntity.setMaxValue(receivedValue);
  122. alarmDetailsEntity.setStatus(1);
  123. alarmDetailsEntity.setDateCreate(new Date());
  124. alarmDetailsEntity.setDateUpdate(new Date());
  125. alarmDetailsEntity.setCreateBy("system");
  126. alarmDetailsEntity.setUpdateBy("system");
  127. insert.add(alarmDetailsEntity);
  128. }
  129. }
  130. }
  131. //处理完成后,剩下的标记为历史数据
  132. for(AlarmDetailsDto alarmDetailsDto:alarmDetailsDtoMap.values()){
  133. alarmDetailsDto.setState(0);
  134. alarmDetailsDto.setAlarmEndTime(receiveDateTime);
  135. alarmDetailsDto.setDateUpdate(new Date());
  136. alarmDetailMapper.update(alarmDetailsDto);
  137. }
  138. //批量插入新增报警
  139. if(insert.size()>0){
  140. alarmDetailMapper.batchInsert(insert);
  141. }
  142. }
  143. }
  144. }
  145. }
  146. }