123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- package com.huaxu.rabbitmq;
- /**
- * @description 监听rabbitmq队列消息,获取设备报警信息
- * @auto wangli
- * @data 2020-11-18 21:23
- */
- import com.alibaba.fastjson.JSONObject;
- import com.huaxu.common.CalcUtil;
- import com.huaxu.common.StringUtils;
- import com.huaxu.dao.AlarmDetailMapper;
- import com.huaxu.dto.AlarmDetailsDto;
- import com.huaxu.dto.DeviceCheckAlarmDto;
- import com.huaxu.entity.AlarmDetailsEntity;
- import com.huaxu.entity.MonitorDataEntity;
- import com.huaxu.entity.MonitorDataValueEntity;
- import com.huaxu.service.MonitorDataService;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.lang.exception.ExceptionUtils;
- import org.springframework.amqp.core.AmqpTemplate;
- import org.springframework.amqp.rabbit.annotation.RabbitHandler;
- import org.springframework.amqp.rabbit.annotation.RabbitListener;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.context.annotation.Bean;
- import org.springframework.stereotype.Component;
- import javax.annotation.Resource;
- import java.text.ParseException;
- import java.text.SimpleDateFormat;
- import java.util.*;
- import java.util.stream.Collectors;
- @Component
- @Slf4j
- public class ReceiveData {
- @Autowired
- private AmqpTemplate rabbitTemplate;
- @Resource
- private AlarmDetailMapper alarmDetailMapper;
- @Value("${spring.rabbitmq.listener.queue}")
- private String rabbitmqQueue;
- @Autowired
- private MonitorDataService monitorDataService;
- /**
- * 先注入,然后再通过SPEL取值
- * @return
- */
- @Bean
- public String rabbitmqQueue(){
- return rabbitmqQueue;
- }
- @RabbitHandler
- @RabbitListener(queues = "#{rabbitmqQueue}")
- public void received(byte[] receivedData) {
- try {
- log.info("rabbitMq接收消息:"+new String(receivedData));
- receivedDataHandle(receivedData);
- } catch (Exception e) {
- log.error(ExceptionUtils.getStackTrace(e));
- // 发送异常时消息返回队列
- rabbitTemplate.convertAndSend(rabbitmqQueue, receivedData);
- }
- }
- public void receivedDataHandle(byte[] receivedData){
- //{"agentIdentifier":"balikun_lvyuantest_tcp_agent",
- //"eventTime":"2020-11-25 19:05:56","id":781234302422745088,"manufacturer":"lvyuantest","mode":"TCP-JSON",
- //"originalData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
- //"originalDataFormat":"application/json",
- //"parsedData":"{\"temp\":0,\"currentPress\":30,\"positiveFlow\":50,\"meterElectricity\":70,\"reverseFlow\":60,\"PH\":8.474,\"currentFlow\":20,\"VOL_STATUS\":1,\"EC\":0,\"currentFlowRate\":40}",
- //"type":"水质监测仪","unitIdentifier":"HX_DH-001"}
- JSONObject jsonObject = JSONObject.parseObject(new String(receivedData));
- //对象有时间、有设备编码、有数据则解析
- if(jsonObject.containsKey("eventTime")&&jsonObject.containsKey("unitIdentifier")&&jsonObject.containsKey("parsedData")){
- SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- Date receiveDateTime ;
- try {
- receiveDateTime = f.parse(jsonObject.getString("eventTime"));
- }catch (ParseException e){
- log.error("报警信息查询时间转换错误,原数据:eventTime:{},异常信息:{}",jsonObject.containsKey("eventTime"),e.getMessage());
- return;
- }
- String devcieCode =jsonObject .getString("unitIdentifier");
- JSONObject receiveData =JSONObject.parseObject(jsonObject .getString("parsedData"));
- //保存报警数据
- if(StringUtils.isBlank(devcieCode)){
- return ;
- }
- List<MonitorDataEntity> list = new ArrayList<>();
- //暂时测试共用一个设备数据
- MonitorDataEntity monitorDataEntity = monitorDataService.getDeviceMonitorInfoByDeviceCode(devcieCode);
- //
- //查询不到设备或者设备属性为空
- if(monitorDataEntity == null
- || monitorDataEntity.getDataValues() == null
- || monitorDataEntity.getDataValues().size() == 0){
- return ;
- }
- List<MonitorDataValueEntity> monitorDataValueEntities= monitorDataEntity.getDataValues();
- for(MonitorDataValueEntity monitorDataValueEntity :monitorDataValueEntities){
- monitorDataValueEntity.setDataValue(receiveData.getDouble(monitorDataValueEntity.getIdentifier()));
- //单位问题处理
- }
- monitorDataEntity.setCollectDate(receiveDateTime);
- Calendar cal = Calendar.getInstance();
- cal.setTime(receiveDateTime);
- monitorDataEntity.setYear(cal.get(Calendar.YEAR));
- monitorDataEntity.setMonth(cal.get(Calendar.MONTH)+1);
- monitorDataEntity.setDay(cal.get(Calendar.DAY_OF_MONTH));
- monitorDataEntity.setHour(cal.get(Calendar.HOUR_OF_DAY));
- monitorDataService.save(monitorDataEntity);
- //修改设备上报时间
- alarmDetailMapper.udpateLastUpdateTime(monitorDataEntity.getDeviceId(),receiveDateTime);
- //获取需要验证的报警条件
- List<DeviceCheckAlarmDto> deviceCheckAlarmDtos = alarmDetailMapper.selectDeviceForCheckAlarm(monitorDataEntity.getDeviceId(),"参数报警");
- //有设置报警参数
- if(deviceCheckAlarmDtos.size()>0 && StringUtils.isNotBlank(deviceCheckAlarmDtos.get(0).getIdentifiter())){
- Integer deviceId= deviceCheckAlarmDtos.get(0).getDeviceId();
- //系统中已存在的报警信息
- List<AlarmDetailsDto> alarmDetailsDtos = alarmDetailMapper.selectStateAlarm(deviceId,"参数报警");
- //已存在的报警信息转为map方便匹配alarmSettingId
- Map<Integer,AlarmDetailsDto> alarmDetailsDtoMap = alarmDetailsDtos.stream().collect(Collectors.toMap(AlarmDetailsDto::getAlarmSettingId, a -> a,(k1, k2)->k1));
- List<AlarmDetailsEntity> insert =new ArrayList<>();
- List<AlarmDetailsDto> update =new ArrayList<>();
- List<AlarmDetailsDto> delete =new ArrayList<>();
- //校验各参数异常情况
- for(DeviceCheckAlarmDto deviceCheckAlarmDto:deviceCheckAlarmDtos){
- //获取参数接收值
- Double receivedValue = receiveData.getDouble(deviceCheckAlarmDto.getIdentifiter());
- if(deviceCheckAlarmDto.checkdeviceAttributeAlarm(receivedValue)){
- //判断报警是否已存在
- if(alarmDetailsDtoMap.containsKey(deviceCheckAlarmDto.getAlarmSettingId())){
- AlarmDetailsDto alarmDetailsDto = alarmDetailsDtoMap.get(deviceCheckAlarmDto.getAlarmSettingId());
- alarmDetailsDto.setAlarmValue(receivedValue);
- alarmDetailsDto.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
- alarmDetailsDto.setMinValue(CalcUtil.compareBySign(alarmDetailsDto.getMinValue(),receivedValue, "<")?alarmDetailsDto.getMinValue():receivedValue);
- alarmDetailsDto.setMaxValue(CalcUtil.compareBySign(alarmDetailsDto.getMaxValue(),receivedValue,">")?alarmDetailsDto.getMaxValue():receivedValue);
- // update.add(alarmDetailsDto);
- alarmDetailsDto.setDateUpdate( new Date());
- alarmDetailMapper.update(alarmDetailsDto);
- //已存在的修改后从集合移除
- alarmDetailsDtoMap.remove(deviceCheckAlarmDto.getAlarmSettingId());
- }else{
- AlarmDetailsEntity alarmDetailsEntity = new AlarmDetailsDto();
- alarmDetailsEntity.setTenantId(deviceCheckAlarmDto.getTenantId());
- alarmDetailsEntity.setDeviceId(deviceCheckAlarmDto.getDeviceId());
- alarmDetailsEntity.setCompanyOrgId(deviceCheckAlarmDto.getCompanyOrgId());
- alarmDetailsEntity.setDeptOrgId(deviceCheckAlarmDto.getDeptOrgId());
- alarmDetailsEntity.setAlarmType(deviceCheckAlarmDto.getAlarmType());
- alarmDetailsEntity.setAttributeId(deviceCheckAlarmDto.getAttributeId());
- alarmDetailsEntity.setAlarmValue(receivedValue);
- alarmDetailsEntity.setAlarmStartTime(receiveDateTime);
- alarmDetailsEntity.setAlarmContent(deviceCheckAlarmDto.getAlarminfo(receivedValue));
- alarmDetailsEntity.setAlarmSettingId(deviceCheckAlarmDto.getAlarmSettingId());
- alarmDetailsEntity.setState(1);
- alarmDetailsEntity.setOpState(1);
- alarmDetailsEntity.setMinValue(receivedValue);
- alarmDetailsEntity.setMaxValue(receivedValue);
- alarmDetailsEntity.setStatus(1);
- alarmDetailsEntity.setDateCreate(new Date());
- alarmDetailsEntity.setDateUpdate(new Date());
- alarmDetailsEntity.setCreateBy("system");
- alarmDetailsEntity.setUpdateBy("system");
- insert.add(alarmDetailsEntity);
- }
- }
- }
- //处理完成后,剩下的标记为历史数据
- for(AlarmDetailsDto alarmDetailsDto:alarmDetailsDtoMap.values()){
- alarmDetailsDto.setState(0);
- alarmDetailsDto.setAlarmEndTime(receiveDateTime);
- alarmDetailsDto.setDateUpdate(new Date());
- alarmDetailMapper.update(alarmDetailsDto);
- }
- //批量插入新增报警
- if(insert.size()>0){
- alarmDetailMapper.batchInsert(insert);
- }
- }
- }
- }
- }
|