package com.huaxu.rabbitmq; /** * @description 监听rabbitmq队列消息,获取设备报警信息 * @auto wangli * @data 2020-11-18 21:23 */ import com.alibaba.fastjson.JSONObject; import com.huaxu.client.OperationManagerClient; import com.huaxu.client.UserCenterClient; import com.huaxu.common.CalcUtil; import com.huaxu.common.StringUtils; import com.huaxu.dao.AlarmDetailMapper; import com.huaxu.dto.AlarmDetailsDto; import com.huaxu.dto.DeviceCheckAlarmDto; import com.huaxu.dto.WorkOrderManageByAlarmDto; import com.huaxu.entity.AlarmDetailsEntity; import com.huaxu.entity.Message; import com.huaxu.entity.MonitorDataEntity; import com.huaxu.entity.MonitorDataValueEntity; import com.huaxu.service.MonitorDataService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.awt.*; import java.math.BigDecimal; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.List; import java.util.stream.Collectors; @Component @Slf4j public class ReceiveData { @Resource(name = "messageRabbitTemplate") private AmqpTemplate messageRabbitTemplate; @Resource(name = "receiveDataRabbitTemplate") private AmqpTemplate receiveDataRabbitTemplate; @Resource private AlarmDetailMapper alarmDetailMapper; @Autowired private UserCenterClient userCenterClient; @Value("${receiveData.spring.rabbitmq.listener.queue}") private String rabbitmqQueue; @Autowired private MonitorDataService monitorDataService; @Autowired private OperationManagerClient operationManagerClient; /** * 先注入,然后再通过SPEL取值 * @return */ //@Bean public String rabbitmqQueue(){ return rabbitmqQueue; } @Value("${receive.exchange.name}") private String receiveExchangeName; @Value("${dispath.routing.key}") private String dispathRoutingKey; private void send(Message message){ log.debug("消息发送 exchange={} routingkey={} 用户id={}",receiveExchangeName,dispathRoutingKey,message.getUserId()); messageRabbitTemplate.convertAndSend(receiveExchangeName,dispathRoutingKey, JSONObject.toJSONString(message)); } //@RabbitHandler //@RabbitListener(queues = "#{rabbitmqQueue}",containerFactory = "receiveDataContainerFactory") public void received(byte[] receivedData) { try { log.debug("rabbitMq接收消息:"+new String(receivedData)); receivedDataHandle(receivedData); } catch (Exception e) { log.error(ExceptionUtils.getStackTrace(e)); // 发送异常时消息返回队列 // receiveDataRabbitTemplate.convertAndSend(rabbitmqQueue, receivedData); } } public void receivedDataHandle(byte[] receivedData){ //{"agentIdentifier":"balikun_lvyuantest_tcp_agent", //"eventTime":"2020-11-25 19:05:56","id":781234302422745088,"manufacturer":"lvyuantest","mode":"TCP-JSON", //"originalData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}", //"originalDataFormat":"application/json", //"parsedData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}", //"type":"水质监测仪","unitIdentifier":"HX_DH-001"} JSONObject jsonObject = JSONObject.parseObject(new String(receivedData)); //对象有时间、有设备编码、有数据则解析 if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date receiveDateTime ; try { receiveDateTime = f.parse(jsonObject.getString("eventTime")); }catch (ParseException e){ log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage()); return; } String devcieCode =jsonObject .getString("unitIdentifier"); JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData")); if(StringUtils.isBlank(devcieCode)){ return ; } MonitorDataEntity monitorDataEntity = monitorDataService.getDeviceMonitorInfoByDeviceCode(devcieCode); // //查询不到设备或者设备属性为空 if(monitorDataEntity == null || monitorDataEntity.getDataValues() == null || monitorDataEntity.getDataValues().size() == 0){ return ; } List monitorDataValueEntities= monitorDataEntity.getDataValues(); Integer number = 0; for(MonitorDataValueEntity monitorDataValueEntity :monitorDataValueEntities){ if(receiveData.containsKey(monitorDataValueEntity.getIdentifier())){ number++; //小数位处理 Double dataValue =new BigDecimal(receiveData.getDouble(monitorDataValueEntity.getIdentifier())) .setScale(5, BigDecimal.ROUND_HALF_UP).doubleValue(); monitorDataValueEntity.setDataValue(dataValue); //单位问题处理 } } //没有匹配到属性,视为垃圾数据忽略 if(number == 0){ return ; } monitorDataEntity.setCollectDate(receiveDateTime); Calendar cal = Calendar.getInstance(); cal.setTime(receiveDateTime); monitorDataEntity.setYear(cal.get(Calendar.YEAR)); monitorDataEntity.setMonth(cal.get(Calendar.MONTH)+1); monitorDataEntity.setDay(cal.get(Calendar.DAY_OF_MONTH)); monitorDataEntity.setHour(cal.get(Calendar.HOUR_OF_DAY)); monitorDataService.save(monitorDataEntity); //修改设备上报时间 alarmDetailMapper.udpateLastUpdateTime(monitorDataEntity.getDeviceId(),receiveDateTime); //获取需要验证的报警条件 List deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(monitorDataEntity.getDeviceId(),"参数报警"); //有设置报警参数 if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){ Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId(); //系统中已存在的报警信息 List alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警",null); //已存在的报警信息转为map方便匹配alarmSettingId Map alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1)); List insert =new ArrayList<>(); List update =new ArrayList<>(); //校验各参数异常情况 for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){ //获取参数接收值 Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter()); if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){ //判断报警是否已存在 if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){ AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId()); alarmDetailsDto.setAlarmValue(receivedValue); // alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue)); alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue); alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue); // update.add(alarmDetailsDto); alarmDetailsDto.setDateUpdate( new Date()); alarmDetailMapper.update(alarmDetailsDto); update.add(alarmDetailsDto); //已存在的修改后从集合移除 alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId()); }else{ AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto(); alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId()); alarmDetailsEntity.setParentSceneId(deviceCheckAlarmDto.getParentSceneId()); alarmDetailsEntity.setParentSceneName(deviceCheckAlarmDto.getParentSceneName()); alarmDetailsEntity.setSceneId(deviceCheckAlarmDto.getSceneId()); alarmDetailsEntity.setSceneName(deviceCheckAlarmDto.getSceneName()); alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId()); alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId()); alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId()); alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType()); alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId()); alarmDetailsEntity.setAlarmValue(receivedValue); alarmDetailsEntity.setAlarmStartTime(receiveDateTime); alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue)); alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId()); alarmDetailsEntity.setState(1); alarmDetailsEntity.setOpState(1); alarmDetailsEntity.setMinValue(receivedValue); alarmDetailsEntity.setMaxValue(receivedValue); alarmDetailsEntity.setStatus(1); alarmDetailsEntity.setDateCreate(new Date()); alarmDetailsEntity.setDateUpdate(new Date()); alarmDetailsEntity.setCreateBy("system"); alarmDetailsEntity.setUpdateBy("system"); Message message=new Message(); message.setStatus(1); message.setTenantId(deviceCheckAlarmDto.getTenantId()); message.setMessageId(UUID.randomUUID().toString()); JSONObject jsonContent = new JSONObject(); //${场景名称}${设备名称}【${报警字段}】异常报警 jsonContent.put("场景名称",deviceCheckAlarmDto.getSceneName()); jsonContent.put("设备名称",deviceCheckAlarmDto.getDeviceName()); jsonContent.put("报警字段",deviceCheckAlarmDto.getAttributeName()); message.setMessageContent(jsonContent.toJSONString()); //消息内容,如果需要动态使用,配合模板使用{key:value} message.setMessageType(1); //消息类型 message.setMessageTemplateId(1); //模板id message.setChannel(0); //渠道 Integer companyOrgId = deviceCheckAlarmDto.getCompanyOrgId(); Integer departmentOrgId = deviceCheckAlarmDto.getDeptOrgId(); try{ List taskUsers = userCenterClient.findUserIdsByPermissonOrg(deviceCheckAlarmDto.getTenantId(),companyOrgId,departmentOrgId); if(taskUsers!=null){ taskUsers.forEach(id->{ message.setUserId(id); this.send(message); }); } }catch(Exception e){ e.printStackTrace(); log.info("推送报警消息失败:{}",e.getMessage()); } insert.add(alarmDetailsEntity); } } } //批量插入新增报警 if(insert.size()>0){ alarmDetailMapper.batchInsert(insert); } if(alarmDetailsDtoMap.values().size() > 0){ //处理完成后,剩下的标记为历史数据 List deviceIds = new ArrayList<>(); deviceIds.add(monitorDataEntity.getDeviceId().longValue()); List workOrders = operationManagerClient.findWorkOrderByDeviceIds(deviceIds); if(workOrders.size()>0){ WorkOrderManageByAlarmDto orderManageByAlarmDto = workOrders.get(0); String taskDesc = orderManageByAlarmDto.getTaskDesc(); //标记为历史数据 并 修改工单描述 for(AlarmDetailsDto alarmDetailsDto : alarmDetailsDtoMap.values()){ alarmDetailsDto.setState(0); alarmDetailsDto.setAlarmEndTime(receiveDateTime); alarmDetailsDto.setDateUpdate(new Date()); alarmDetailMapper.update(alarmDetailsDto); taskDesc = taskDesc.replace(";"+alarmDetailsDto.getAlarmContent(),"") .replace(alarmDetailsDto.getAlarmContent(),""); } orderManageByAlarmDto.setTaskDesc(taskDesc); if(orderManageByAlarmDto != null && (orderManageByAlarmDto.getOrderStatus() == null || orderManageByAlarmDto.getOrderStatus() == 0) && StringUtils.isBlank(taskDesc)){ //工单未派单且无报警信息,删除工单 operationManagerClient.batchDeleteByAlarms(new ArrayList<>(deviceIds)); }else{ //其他情况修改描述 operationManagerClient.updateByAlarms(workOrders); } } } //报警恢复的删除工单 // Map AlarmDtoOldsMap = update.stream().collect(Collectors.toMap(AlarmDetailsEntity::getDeviceId, a -> a,(k1, k2)->k1)); // Set deviceIds = alarmDetailsDtoMap.values().stream() // .filter(a -> !AlarmDtoOldsMap.containsKey(a.getDeviceId())) // .map(a -> a.getDeviceId().longValue()) // .collect(Collectors.toSet()); // if(deviceIds.size()>0){ // operationManagerClient.batchDeleteByAlarms(new ArrayList<>(deviceIds)); // } } } } }