package com.huaxu.rabbitmq; /** * @description 监听rabbitmq队列消息,获取设备报警信息 * @auto wangli * @data 2020-11-18 21:23 */ import com.alibaba.fastjson.JSONObject; import com.huaxu.common.CalcUtil; import com.huaxu.common.StringUtils; import com.huaxu.dao.AlarmDetailMapper; import com.huaxu.dto.AlarmDetailsDto; import com.huaxu.dto.DeviceCheckAlarmDto; import com.huaxu.entity.AlarmDetailsEntity; import com.huaxu.entity.MonitorDataEntity; import com.huaxu.entity.MonitorDataValueEntity; import com.huaxu.service.MonitorDataService; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang.exception.ExceptionUtils; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.*; import java.util.stream.Collectors; @Component @Slf4j public class ReceiveData { @Autowired private AmqpTemplate rabbitTemplate; @Resource private AlarmDetailMapper alarmDetailMapper; @Value("${spring.rabbitmq.listener.queue}") private String rabbitmqQueue; @Autowired private MonitorDataService monitorDataService; /** * 先注入,然后再通过SPEL取值 * @return */ @Bean public String rabbitmqQueue(){ return rabbitmqQueue; } @RabbitHandler @RabbitListener(queues = "#{rabbitmqQueue}") public void received(byte[] receivedData) { try { log.info("rabbitMq接收消息:"+new String(receivedData)); receivedDataHandle(receivedData); } catch (Exception e) { log.error(ExceptionUtils.getStackTrace(e)); // 发送异常时消息返回队列 rabbitTemplate.convertAndSend(rabbitmqQueue, receivedData); } } public void receivedDataHandle(byte[] receivedData){ //{"agentIdentifier":"balikun_lvyuantest_tcp_agent", //"eventTime":"2020-11-25 19:05:56","id":781234302422745088,"manufacturer":"lvyuantest","mode":"TCP-JSON", //"originalData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}", //"originalDataFormat":"application/json", //"parsedData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}", //"type":"水质监测仪","unitIdentifier":"HX_DH-001"} JSONObject jsonObject = JSONObject.parseObject(new String(receivedData)); //对象有时间、有设备编码、有数据则解析 if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){ SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); Date receiveDateTime ; try { receiveDateTime = f.parse(jsonObject.getString("eventTime")); }catch (ParseException e){ log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage()); return; } String devcieCode =jsonObject .getString("unitIdentifier"); JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData")); //保存报警数据 if(StringUtils.isBlank(devcieCode)){ return ; } List list = new ArrayList<>(); //暂时测试共用一个设备数据 MonitorDataEntity monitorDataEntity = monitorDataService.getDeviceMonitorInfoByDeviceCode(devcieCode); // //查询不到设备或者设备属性为空 if(monitorDataEntity == null || monitorDataEntity.getDataValues() == null || monitorDataEntity.getDataValues().size() == 0){ return ; } List monitorDataValueEntities= monitorDataEntity.getDataValues(); for(MonitorDataValueEntity monitorDataValueEntity :monitorDataValueEntities){ monitorDataValueEntity.setDataValue(receiveData.getDouble(monitorDataValueEntity.getIdentifier())); //单位问题处理 } monitorDataEntity.setCollectDate(receiveDateTime); Calendar cal = Calendar.getInstance(); cal.setTime(receiveDateTime); monitorDataEntity.setYear(cal.get(Calendar.YEAR)); monitorDataEntity.setMonth(cal.get(Calendar.MONTH)+1); monitorDataEntity.setDay(cal.get(Calendar.DAY_OF_MONTH)); monitorDataEntity.setHour(cal.get(Calendar.HOUR_OF_DAY)); monitorDataService.save(monitorDataEntity); //修改设备上报时间 alarmDetailMapper.udpateLastUpdateTime(monitorDataEntity.getDeviceId(),receiveDateTime); //获取需要验证的报警条件 List deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(monitorDataEntity.getDeviceId(),"参数报警"); //有设置报警参数 if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){ Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId(); //系统中已存在的报警信息 List alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警"); //已存在的报警信息转为map方便匹配alarmSettingId Map alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1)); List insert =new ArrayList<>(); List update =new ArrayList<>(); List delete =new ArrayList<>(); //校验各参数异常情况 for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){ //获取参数接收值 Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter()); if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){ //判断报警是否已存在 if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){ AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId()); alarmDetailsDto.setAlarmValue(receivedValue); alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue)); alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue); alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue); // update.add(alarmDetailsDto); alarmDetailsDto.setDateUpdate( new Date()); alarmDetailMapper.update(alarmDetailsDto); //已存在的修改后从集合移除 alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId()); }else{ AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto(); alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId()); alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId()); alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId()); alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId()); alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType()); alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId()); alarmDetailsEntity.setAlarmValue(receivedValue); alarmDetailsEntity.setAlarmStartTime(receiveDateTime); alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue)); alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId()); alarmDetailsEntity.setState(1); alarmDetailsEntity.setOpState(1); alarmDetailsEntity.setMinValue(receivedValue); alarmDetailsEntity.setMaxValue(receivedValue); alarmDetailsEntity.setStatus(1); alarmDetailsEntity.setDateCreate(new Date()); alarmDetailsEntity.setDateUpdate(new Date()); alarmDetailsEntity.setCreateBy("system"); alarmDetailsEntity.setUpdateBy("system"); insert.add(alarmDetailsEntity); } } } //处理完成后,剩下的标记为历史数据 for(AlarmDetailsDto alarmDetailsDto:alarmDetailsDtoMap.values()){ alarmDetailsDto.setState(0); alarmDetailsDto.setAlarmEndTime(receiveDateTime); alarmDetailsDto.setDateUpdate(new Date()); alarmDetailMapper.update(alarmDetailsDto); } //批量插入新增报警 if(insert.size()>0){ alarmDetailMapper.batchInsert(insert); } } } } }