ReceiveData.java 16 KB


  1. package com.huaxu.rabbitmq;
  2. /**
  3. * @description 监听rabbitmq队列消息,获取设备报警信息
  4. * @auto wangli
  5. * @data 2020-11-18 21:23
  6. */
  7. import com.alibaba.fastjson.JSONObject;
  8. import com.huaxu.client.OperationManagerClient;
  9. import com.huaxu.client.UserCenterClient;
  10. import com.huaxu.common.CalcUtil;
  11. import com.huaxu.common.StringUtils;
  12. import com.huaxu.dao.AlarmDetailMapper;
  13. import com.huaxu.dto.AlarmDetailsDto;
  14. import com.huaxu.dto.DeviceCheckAlarmDto;
  15. import com.huaxu.dto.WorkOrderManageByAlarmDto;
  16. import com.huaxu.entity.AlarmDetailsEntity;
  17. import com.huaxu.entity.Message;
  18. import com.huaxu.entity.MonitorDataEntity;
  19. import com.huaxu.entity.MonitorDataValueEntity;
  20. import com.huaxu.service.MonitorDataService;
  21. import lombok.extern.slf4j.Slf4j;
  22. import org.apache.commons.lang.exception.ExceptionUtils;
  23. import org.springframework.amqp.core.AmqpTemplate;
  24. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  25. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  26. import org.springframework.beans.factory.annotation.Autowired;
  27. import org.springframework.beans.factory.annotation.Value;
  28. import org.springframework.context.annotation.Bean;
  29. import org.springframework.stereotype.Component;
  30. import javax.annotation.Resource;
  31. import java.awt.*;
  32. import java.math.BigDecimal;
  33. import java.text.ParseException;
  34. import java.text.SimpleDateFormat;
  35. import java.util.*;
  36. import java.util.List;
  37. import java.util.stream.Collectors;
  38. @Component
  39. @Slf4j
  40. public class ReceiveData {
  41. @Resource(name = "messageRabbitTemplate")
  42. private AmqpTemplate messageRabbitTemplate;
  43. @Resource(name = "receiveDataRabbitTemplate")
  44. private AmqpTemplate receiveDataRabbitTemplate;
  45. @Resource
  46. private AlarmDetailMapper alarmDetailMapper;
  47. @Autowired
  48. private UserCenterClient userCenterClient;
  49. @Value("${receiveData.spring.rabbitmq.listener.queue}")
  50. private String rabbitmqQueue;
  51. @Autowired
  52. private MonitorDataService monitorDataService;
  53. @Autowired
  54. private OperationManagerClient operationManagerClient;
  55. /**
  56. * 先注入,然后再通过SPEL取值
  57. * @return
  58. */
  59. //@Bean
  60. public String rabbitmqQueue(){
  61. return rabbitmqQueue;
  62. }
  63. @Value("${receive.exchange.name}")
  64. private String receiveExchangeName;
  65. @Value("${dispath.routing.key}")
  66. private String dispathRoutingKey;
  67. private void send(Message message){
  68. log.debug("消息发送 exchange={} routingkey={} 用户id={}",receiveExchangeName,dispathRoutingKey,message.getUserId());
  69. messageRabbitTemplate.convertAndSend(receiveExchangeName,dispathRoutingKey, JSONObject.toJSONString(message));
  70. }
  71. //@RabbitHandler
  72. //@RabbitListener(queues = "#{rabbitmqQueue}",containerFactory = "receiveDataContainerFactory")
  73. public void received(byte[] receivedData) {
  74. try {
  75. log.debug("rabbitMq接收消息:"+new String(receivedData));
  76. receivedDataHandle(receivedData);
  77. } catch (Exception e) {
  78. log.error(ExceptionUtils.getStackTrace(e));
  79. // 发送异常时消息返回队列
  80. // receiveDataRabbitTemplate.convertAndSend(rabbitmqQueue, receivedData);
  81. }
  82. }
  83. public void receivedDataHandle(byte[] receivedData){
  84. //{"agentIdentifier":"balikun_lvyuantest_tcp_agent",
  85. //"eventTime":"2020-11-25 19:05:56","id":781234302422745088,"manufacturer":"lvyuantest","mode":"TCP-JSON",
  86. //"originalData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  87. //"originalDataFormat":"application/json",
  88. //"parsedData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
  89. //"type":"水质监测仪","unitIdentifier":"HX_DH-001"}
  90. JSONObject jsonObject = JSONObject.parseObject(new String(receivedData));
  91. //对象有时间、有设备编码、有数据则解析
  92. if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){
  93. SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
  94. Date receiveDateTime ;
  95. try {
  96. receiveDateTime = f.parse(jsonObject.getString("eventTime"));
  97. }catch (ParseException e){
  98. log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage());
  99. return;
  100. }
  101. String devcieCode =jsonObject .getString("unitIdentifier");
  102. JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData"));
  103. if(StringUtils.isBlank(devcieCode)){
  104. return ;
  105. }
  106. MonitorDataEntity monitorDataEntity = monitorDataService.getDeviceMonitorInfoByDeviceCode(devcieCode);
  107. //
  108. //查询不到设备或者设备属性为空
  109. if(monitorDataEntity == null
  110. || monitorDataEntity.getDataValues() == null
  111. || monitorDataEntity.getDataValues().size() == 0){
  112. return ;
  113. }
  114. List<MonitorDataValueEntity> monitorDataValueEntities= monitorDataEntity.getDataValues();
  115. Integer number = 0;
  116. for(MonitorDataValueEntity monitorDataValueEntity :monitorDataValueEntities){
  117. if(receiveData.containsKey(monitorDataValueEntity.getIdentifier())){
  118. number++;
  119. //小数位处理
  120. Double dataValue =new BigDecimal(receiveData.getDouble(monitorDataValueEntity.getIdentifier()))
  121. .setScale(5, BigDecimal.ROUND_HALF_UP).doubleValue();
  122. monitorDataValueEntity.setDataValue(dataValue);
  123. //单位问题处理
  124. }
  125. }
  126. //没有匹配到属性,视为垃圾数据忽略
  127. if(number == 0){
  128. return ;
  129. }
  130. monitorDataEntity.setCollectDate(receiveDateTime);
  131. Calendar cal = Calendar.getInstance();
  132. cal.setTime(receiveDateTime);
  133. monitorDataEntity.setYear(cal.get(Calendar.YEAR));
  134. monitorDataEntity.setMonth(cal.get(Calendar.MONTH)+1);
  135. monitorDataEntity.setDay(cal.get(Calendar.DAY_OF_MONTH));
  136. monitorDataEntity.setHour(cal.get(Calendar.HOUR_OF_DAY));
  137. monitorDataService.save(monitorDataEntity);
  138. //修改设备上报时间
  139. alarmDetailMapper.udpateLastUpdateTime(monitorDataEntity.getDeviceId(),receiveDateTime);
  140. //获取需要验证的报警条件
  141. List<DeviceCheckAlarmDto> deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(monitorDataEntity.getDeviceId(),"参数报警");
  142. //有设置报警参数
  143. if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){
  144. Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId();
  145. //系统中已存在的报警信息
  146. List<AlarmDetailsDto> alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警",null);
  147. //已存在的报警信息转为map方便匹配alarmSettingId
  148. Map<Integer,AlarmDetailsDto> alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1));
  149. List<AlarmDetailsEntity> insert =new ArrayList<>();
  150. List<AlarmDetailsEntity> update =new ArrayList<>();
  151. //校验各参数异常情况
  152. for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){
  153. //获取参数接收值
  154. Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter());
  155. if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){
  156. //判断报警是否已存在
  157. if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){
  158. AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId());
  159. alarmDetailsDto.setAlarmValue(receivedValue);
  160. // alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  161. alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue);
  162. alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue);
  163. // update.add(alarmDetailsDto);
  164. alarmDetailsDto.setDateUpdate( new Date());
  165. alarmDetailMapper.update(alarmDetailsDto);
  166. update.add(alarmDetailsDto);
  167. //已存在的修改后从集合移除
  168. alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId());
  169. }else{
  170. AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto();
  171. alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId());
  172. alarmDetailsEntity.setParentSceneId(deviceCheckAlarmDto.getParentSceneId());
  173. alarmDetailsEntity.setParentSceneName(deviceCheckAlarmDto.getParentSceneName());
  174. alarmDetailsEntity.setSceneId(deviceCheckAlarmDto.getSceneId());
  175. alarmDetailsEntity.setSceneName(deviceCheckAlarmDto.getSceneName());
  176. alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId());
  177. alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId());
  178. alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId());
  179. alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType());
  180. alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId());
  181. alarmDetailsEntity.setAlarmValue(receivedValue);
  182. alarmDetailsEntity.setAlarmStartTime(receiveDateTime);
  183. alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
  184. alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId());
  185. alarmDetailsEntity.setState(1);
  186. alarmDetailsEntity.setOpState(1);
  187. alarmDetailsEntity.setMinValue(receivedValue);
  188. alarmDetailsEntity.setMaxValue(receivedValue);
  189. alarmDetailsEntity.setStatus(1);
  190. alarmDetailsEntity.setDateCreate(new Date());
  191. alarmDetailsEntity.setDateUpdate(new Date());
  192. alarmDetailsEntity.setCreateBy("system");
  193. alarmDetailsEntity.setUpdateBy("system");
  194. Message message=new Message();
  195. message.setStatus(1);
  196. message.setTenantId(deviceCheckAlarmDto.getTenantId());
  197. message.setMessageId(UUID.randomUUID().toString());
  198. JSONObject jsonContent = new JSONObject();
  199. //${场景名称}${设备名称}【${报警字段}】异常报警
  200. jsonContent.put("场景名称",deviceCheckAlarmDto.getSceneName());
  201. jsonContent.put("设备名称",deviceCheckAlarmDto.getDeviceName());
  202. jsonContent.put("报警字段",deviceCheckAlarmDto.getAttributeName());
  203. message.setMessageContent(jsonContent.toJSONString()); //消息内容,如果需要动态使用,配合模板使用{key:value}
  204. message.setMessageType(1); //消息类型
  205. message.setMessageTemplateId(1); //模板id
  206. message.setChannel(0); //渠道
  207. Integer companyOrgId = deviceCheckAlarmDto.getCompanyOrgId();
  208. Integer departmentOrgId = deviceCheckAlarmDto.getDeptOrgId();
  209. try{
  210. List<Integer> taskUsers = userCenterClient.findUserIdsByPermissonOrg(deviceCheckAlarmDto.getTenantId(),companyOrgId,departmentOrgId);
  211. if(taskUsers!=null){
  212. taskUsers.forEach(id->{
  213. message.setUserId(id);
  214. this.send(message);
  215. });
  216. }
  217. }catch(Exception e){
  218. e.printStackTrace();
  219. log.info("推送报警消息失败:{}",e.getMessage());
  220. }
  221. insert.add(alarmDetailsEntity);
  222. }
  223. }
  224. }
  225. //批量插入新增报警
  226. if(insert.size()>0){
  227. alarmDetailMapper.batchInsert(insert);
  228. }
  229. if(alarmDetailsDtoMap.values().size() > 0){
  230. //处理完成后,剩下的标记为历史数据
  231. List<Long> deviceIds = new ArrayList<>();
  232. deviceIds.add(monitorDataEntity.getDeviceId().longValue());
  233. List<WorkOrderManageByAlarmDto> workOrders = operationManagerClient.findWorkOrderByDeviceIds(deviceIds);
  234. if(workOrders.size()>0){
  235. WorkOrderManageByAlarmDto orderManageByAlarmDto = workOrders.get(0);
  236. String taskDesc = orderManageByAlarmDto.getTaskDesc();
  237. //标记为历史数据 并 修改工单描述
  238. for(AlarmDetailsDto alarmDetailsDto : alarmDetailsDtoMap.values()){
  239. alarmDetailsDto.setState(0);
  240. alarmDetailsDto.setAlarmEndTime(receiveDateTime);
  241. alarmDetailsDto.setDateUpdate(new Date());
  242. alarmDetailMapper.update(alarmDetailsDto);
  243. taskDesc = taskDesc.replace(";"+alarmDetailsDto.getAlarmContent(),"")
  244. .replace(alarmDetailsDto.getAlarmContent(),"");
  245. }
  246. orderManageByAlarmDto.setTaskDesc(taskDesc);
  247. if(orderManageByAlarmDto != null
  248. && (orderManageByAlarmDto.getOrderStatus() == null || orderManageByAlarmDto.getOrderStatus() == 0)
  249. && StringUtils.isBlank(taskDesc)){
  250. //工单未派单且无报警信息,删除工单
  251. operationManagerClient.batchDeleteByAlarms(new ArrayList<>(deviceIds));
  252. }else{
  253. //其他情况修改描述
  254. operationManagerClient.updateByAlarms(workOrders);
  255. }
  256. }
  257. }
  258. //报警恢复的删除工单
  259. // Map<Integer,AlarmDetailsEntity> AlarmDtoOldsMap = update.stream().collect(Collectors.toMap(AlarmDetailsEntity::getDeviceId, a -> a,(k1, k2)->k1));
  260. // Set<Long> deviceIds = alarmDetailsDtoMap.values().stream()
  261. // .filter(a -> !AlarmDtoOldsMap.containsKey(a.getDeviceId()))
  262. // .map(a -> a.getDeviceId().longValue())
  263. // .collect(Collectors.toSet());
  264. // if(deviceIds.size()>0){
  265. // operationManagerClient.batchDeleteByAlarms(new ArrayList<>(deviceIds));
  266. // }
  267. }
  268. }
  269. }
  270. }