package com.huaxu.rabbitmq; /** * @description 监听rabbitmq队列消息,获取设备报警信息 * @auto wangli * @data 2020-11-18 21:23 */ import com.alibaba.fastjson.JSONObject; import com.huaxu.client.OperationManagerClient; import com.huaxu.client.UserCenterClient; import com.huaxu.common.CalcUtil; import com.huaxu.common.StringUtils; import com.huaxu.dao.AlarmDetailMapper; import com.huaxu.dto.AlarmDetailsDto; import com.huaxu.dto.DeviceCheckAlarmDto; import com.huaxu.entity.AlarmDetailsEntity; import com.huaxu.entity.Message; import com.huaxu.entity.MonitorDataEntity; import com.huaxu.entity.MonitorDataValueEntity; import com.huaxu.service.MonitorDataService; import com.huaxu.util.MessageSendUtil; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; @Component @Slf4j public class ReceiveData { @Autowired private AmqpTemplate rabbitTemplate; @Resource private AlarmDetailMapper alarmDetailMapper; @Autowired private UserCenterClient userCenterClient; @Autowired private MessageSendUtil messageSendUtil; @Value("${spring.rabbitmq.listener.queue}") private String rabbitmqQueue; @Autowired private MonitorDataService monitorDataService; @Autowired private OperationManagerClient operationManagerClient; /** * 先注入,然后再通过SPEL取值 * @return */ @Bean public String rabbitmqQueue(){ return rabbitmqQueue; } @RabbitHandler @RabbitListener(queues = "#{rabbitmqQueue}") public void received(byte[] receivedData) { try { log.info("rabbitMq接收消息:"+new String(receivedData)); receivedDataHandle(receivedData); } catch (Exception e) { log.error(ExceptionUtils.getStackTrace(e)); // 发送异常时消息返回队列 rabbitTemplate.convertAndSend(rabbitmqQueue, receivedData); } } public void receivedDataHandle(byte[] receivedData){ //{"agentIdentifier":"balikun_lvyuantest_tcp_agent", //"eventTime":"2020-11-25 19:05:56","id":781234302422745088,"manufacturer":"lvyuantest","mode":"TCP-JSON", //"originalData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}", //"originalDataFormat":"application/json", //"parsedData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}", //"type":"水质监测仪","unitIdentifier":"HX_DH-001"} JSONObject jsonObject = JSONObject.parseObject(new String(receivedData)); //对象有时间、有设备编码、有数据则解析 if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date receiveDateTime ; try { receiveDateTime = f.parse(jsonObject.getString("eventTime")); }catch (ParseException e){ log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage()); return; } String devcieCode =jsonObject .getString("unitIdentifier"); JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData")); if(StringUtils.isBlank(devcieCode)){ return ; } MonitorDataEntity monitorDataEntity = monitorDataService.getDeviceMonitorInfoByDeviceCode(devcieCode); // //查询不到设备或者设备属性为空 if(monitorDataEntity == null || monitorDataEntity.getDataValues() == null || monitorDataEntity.getDataValues().size() == 0){ return ; } List monitorDataValueEntities= monitorDataEntity.getDataValues(); Integer number = 0; for(MonitorDataValueEntity monitorDataValueEntity :monitorDataValueEntities){ if(receiveData.containsKey(monitorDataValueEntity.getIdentifier())){ number++; monitorDataValueEntity.setDataValue(receiveData.getDouble(monitorDataValueEntity.getIdentifier())); //单位问题处理 } } //没有匹配到属性,视为垃圾数据忽略 if(number == 0){ return ; } monitorDataEntity.setCollectDate(receiveDateTime); Calendar cal = Calendar.getInstance(); cal.setTime(receiveDateTime); monitorDataEntity.setYear(cal.get(Calendar.YEAR)); monitorDataEntity.setMonth(cal.get(Calendar.MONTH)+1); monitorDataEntity.setDay(cal.get(Calendar.DAY_OF_MONTH)); monitorDataEntity.setHour(cal.get(Calendar.HOUR_OF_DAY)); monitorDataService.save(monitorDataEntity); //修改设备上报时间 alarmDetailMapper.udpateLastUpdateTime(monitorDataEntity.getDeviceId(),receiveDateTime); //获取需要验证的报警条件 List deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(monitorDataEntity.getDeviceId(),"参数报警"); //有设置报警参数 if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){ Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId(); //系统中已存在的报警信息 List alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警"); //已存在的报警信息转为map方便匹配alarmSettingId Map alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1)); List insert =new ArrayList<>(); List update =new ArrayList<>(); //校验各参数异常情况 for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){ //获取参数接收值 Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter()); if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){ //判断报警是否已存在 if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){ AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId()); alarmDetailsDto.setAlarmValue(receivedValue); alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue)); alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue); alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue); // update.add(alarmDetailsDto); alarmDetailsDto.setDateUpdate( new Date()); alarmDetailMapper.update(alarmDetailsDto); update.add(alarmDetailsDto); //已存在的修改后从集合移除 alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId()); }else{ AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto(); alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId()); alarmDetailsEntity.setParentSceneId(deviceCheckAlarmDto.getParentSceneId()); alarmDetailsEntity.setParentSceneName(deviceCheckAlarmDto.getParentSceneName()); alarmDetailsEntity.setSceneId(deviceCheckAlarmDto.getSceneId()); alarmDetailsEntity.setSceneName(deviceCheckAlarmDto.getSceneName()); alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId()); alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId()); alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId()); alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType()); alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId()); alarmDetailsEntity.setAlarmValue(receivedValue); alarmDetailsEntity.setAlarmStartTime(receiveDateTime); alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue)); alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId()); alarmDetailsEntity.setState(1); alarmDetailsEntity.setOpState(1); alarmDetailsEntity.setMinValue(receivedValue); alarmDetailsEntity.setMaxValue(receivedValue); alarmDetailsEntity.setStatus(1); alarmDetailsEntity.setDateCreate(new Date()); alarmDetailsEntity.setDateUpdate(new Date()); alarmDetailsEntity.setCreateBy("system"); alarmDetailsEntity.setUpdateBy("system"); Message message=new Message(); message.setStatus(1); message.setTenantId(deviceCheckAlarmDto.getTenantId()); message.setMessageId(UUID.randomUUID().toString()); JSONObject jsonContent = new JSONObject(); //${场景名称}${设备名称}【${报警字段}】异常报警 jsonContent.put("场景名称",deviceCheckAlarmDto.getSceneName()); jsonContent.put("设备名称",deviceCheckAlarmDto.getDeviceName()); jsonContent.put("报警字段",deviceCheckAlarmDto.getAttributeName()); message.setMessageContent(jsonContent.toJSONString()); //消息内容,如果需要动态使用,配合模板使用{key:value} message.setMessageType(1); //消息类型 message.setMessageTemplateId(1); //模板id message.setChannel(0); //渠道 Integer companyOrgId = deviceCheckAlarmDto.getCompanyOrgId(); Integer departmentOrgId = deviceCheckAlarmDto.getDeptOrgId(); try{ List taskUsers = userCenterClient.findUserIdsByPermissonOrg(deviceCheckAlarmDto.getTenantId(),companyOrgId,departmentOrgId); if(taskUsers!=null){ taskUsers.forEach(id->{ message.setUserId(id); messageSendUtil.send(message); }); } }catch(Exception e){ e.printStackTrace(); log.info("推送报警消息失败:{}",e.getMessage()); } insert.add(alarmDetailsEntity); } } } //处理完成后,剩下的标记为历史数据 for(AlarmDetailsDto alarmDetailsDto:alarmDetailsDtoMap.values()){ alarmDetailsDto.setState(0); alarmDetailsDto.setAlarmEndTime(receiveDateTime); alarmDetailsDto.setDateUpdate(new Date()); alarmDetailMapper.update(alarmDetailsDto); } //批量插入新增报警 if(insert.size()>0){ alarmDetailMapper.batchInsert(insert); } //报警恢复的删除工单 Map AlarmDtoNewsMap = insert.stream().collect(Collectors.toMap(AlarmDetailsEntity::getDeviceId, a -> a,(k1, k2)->k1)); Map AlarmDtoOldsMap = update.stream().collect(Collectors.toMap(AlarmDetailsEntity::getDeviceId, a -> a,(k1, k2)->k1)); Set deviceIds = alarmDetailsDtoMap.values().stream() .filter(a -> !(AlarmDtoNewsMap.containsKey(a.getDeviceId())||AlarmDtoOldsMap.containsKey(a.getDeviceId()))) .map(a -> a.getDeviceId().longValue()) .collect(Collectors.toSet()); if(deviceIds.size()>0){ operationManagerClient.batchDeleteByAlarms(new ArrayList<>(deviceIds)); } } } } }