123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195 |
- package com.huaxu.rabbitmq;
- import cn.hutool.core.collection.CollectionUtil;
- import com.alibaba.fastjson.JSONObject;
- import com.huaxu.dao.DeviceAttributeSpecsMapper;
- import com.huaxu.dao.ReportWaterPumpStateMapper;
- import com.huaxu.dto.AlarmDetailsDto;
- import com.huaxu.dto.watePump.ReprotWaterPumpQueryDto;
- import com.huaxu.entity.DeviceAttributeSpecsEntity;
- import com.huaxu.entity.MonitorDataEntity;
- import com.huaxu.entity.MonitorDataValueEntity;
- import com.huaxu.entity.ReportWaterPumpStateEntity;
- import com.huaxu.util.DatesUtil;
- import com.huaxu.util.RedisUtil;
- import io.swagger.models.auth.In;
- import lombok.extern.slf4j.Slf4j;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.scheduling.annotation.Async;
- import org.springframework.stereotype.Component;
- import java.math.BigDecimal;
- import java.net.InetAddress;
- import java.net.UnknownHostException;
- import java.util.*;
- import java.util.stream.Collectors;
- /**
- * @ClassName ReportWaterPumpStateHandler
- * @Description: 水泵运行状态报表业务处理
- * 清洗数据上报,条件:属性参数类型为水泵状态(2)
- * 清洗规则:以分钟为单位计算水泵是否运行,如水泵从00:23:22开始运行, 到01:32:22是停止,
- * 则认为00:23-01:32分为水泵运行状态;
- * 如果数据存在跨天,需要重新新增一条新的数据
- * @Author lihui
- * @Date 2021/4/20
- * @Version V1.0
- **/
- @Component
- @Slf4j
- public class ReportWaterPumpStateHandler {
- private static String IP;
- @Autowired
- private ReportWaterPumpStateMapper reportWaterPumpStateMapper;
- @Autowired
- private DeviceAttributeSpecsMapper deviceAttributeSpecsMapper;
- @Async
- public void handler(MonitorDataEntity monitorDataEntity, JSONObject receiveData, Date receiveDateTime){
- log.info("【水泵运行状态报表:{}】开始处理", monitorDataEntity.getDeviceCode());
- // 0 运行, 1停止
- int state = 0;
- // 执行批量插入操作
- DeviceAttributeSpecsEntity entity = new DeviceAttributeSpecsEntity();
- List<ReportWaterPumpStateEntity> insertList = new ArrayList<>();
- ReprotWaterPumpQueryDto queryDto = getReprotWaterPumpQueryDto(monitorDataEntity);
- // 转换常量,保证获取是同一个对象
- // 避免同一时间内同一个code上报多条数据,保证数据更新和插入一致性
- // TODO 需要采用分布式锁解决,避免部署多台服务器
- synchronized (monitorDataEntity.getDeviceCode().intern()) {
- for (MonitorDataValueEntity valueEntity : monitorDataEntity.getDataValues()) {
- if (!receiveData.containsKey(valueEntity.getIdentifier())){
- continue;
- }
- entity.setAttributeId(valueEntity.getAttributeId());
- // 查找属性ID是否有设置运行、停止状态
- List<DeviceAttributeSpecsEntity> list = deviceAttributeSpecsMapper.findList(entity);
- if (CollectionUtil.isEmpty(list)) {
- continue;
- }
- List<String> listSate = toList(list);
- if (CollectionUtil.isEmpty(listSate)) {
- continue;
- }
- BigDecimal stateValue = new BigDecimal(receiveData.getDouble(valueEntity.getIdentifier()));
- state = listSate.contains(stateValue.toString()) ? 0 : 1;
- queryDto.setAttributeId(valueEntity.getAttributeId());
- addOrUpdate(queryDto.getMd5Query(), monitorDataEntity, valueEntity, insertList, state, receiveDateTime);
- }
- // 最后一次全部插入
- if (insertList.size() > 0 ){
- reportWaterPumpStateMapper.batchInsertReportWaterPumpState(insertList);
- }
- }
- log.info("【水泵运行状态报表:{}】结束处理,新增:{}", monitorDataEntity.getDeviceCode(),insertList.size());
- }
- private List<String> toList(List<DeviceAttributeSpecsEntity> list){
- List<String> result = new ArrayList<>();
- for (DeviceAttributeSpecsEntity deviceAttributeSpecsEntity: list) {
- if (deviceAttributeSpecsEntity.getSpecsName() != null &&
- deviceAttributeSpecsEntity.getSpecsName().indexOf("运行") != -1) {
- result.add(deviceAttributeSpecsEntity.getSpecsValue());
- }
- }
- return result;
- }
- /**
- * @Author lihui
- * @Description 如果查询到数据,根据返回的state来判断数据
- * 如果返回的state和上报的state同一个状态,说明该状态一直在持续,我们不需要做任何的操作
- * 如果返回的state和上报的state不是同一个状态,说明state已经改变,我们需要更新上一次state所持续的最后时间,并新增一条目前的state的开始时间
- * 公式:同一id-> 00:23-00:28 运行时间
- * -> 00:28-00:30 停止时间
- * -> 00:30-null 运行时间
- * @Date 14:08 2021/4/21
- * @Param [md5Query :查询条件, monitorDataEntity, valueEntity, inserList :需要插入的数据集合, stateValue:泵站状态,receiveDateTime:上报数据的时间]
- * @return
- **/
- private void addOrUpdate(String md5Query , MonitorDataEntity monitorDataEntity,
- MonitorDataValueEntity valueEntity ,List<ReportWaterPumpStateEntity> inserList,
- int stateValue, Date receiveDateTime){
- ReportWaterPumpStateEntity stateEntity = reportWaterPumpStateMapper.findReportWaterPumpState(md5Query);
- if (stateEntity == null) {
- inserList.add(getReportWaterPumpStateEntity(monitorDataEntity, valueEntity, receiveDateTime, stateValue));
- return;
- }
- Date beginTime = null;
- // 计算出时间差 等于0表示同一天的数据,大于0表示跨天
- int day = timeDifference(stateEntity.getStateBeginTime(), receiveDateTime);
- boolean theSameSate = stateEntity.getState().intValue() == stateValue;
- // 跨天需要新增一条数据,如果是跨一天的数据起始时间为 receiveDateTime + 00:00:00, 跨多天就按当前上报的时间
- if (day != 0) {
- beginTime = day == 1 ? parseDate(receiveDateTime, "00:00:00") : receiveDateTime;
- reportWaterPumpStateMapper.updateReportWaterPumpState(stateEntity.getId(), parseDate(stateEntity.getStateBeginTime(), "23:59:59"), new Date());
- inserList.add(getReportWaterPumpStateEntity(monitorDataEntity, valueEntity, beginTime, stateValue));
- return;
- }
- // 同一个状态的数据
- if (theSameSate) {
- return;
- }
- reportWaterPumpStateMapper.updateReportWaterPumpState(stateEntity.getId(), receiveDateTime, new Date());
- inserList.add(getReportWaterPumpStateEntity(monitorDataEntity, valueEntity, receiveDateTime, stateValue));
- }
- private ReprotWaterPumpQueryDto getReprotWaterPumpQueryDto(MonitorDataEntity monitorDataEntity){
- ReprotWaterPumpQueryDto queryDto = new ReprotWaterPumpQueryDto();
- queryDto.setYear(monitorDataEntity.getYear());
- queryDto.setMonth(monitorDataEntity.getMonth());
- queryDto.setDay(monitorDataEntity.getDay());
- queryDto.setTenantId(monitorDataEntity.getTenantId());
- queryDto.setDeviceId(monitorDataEntity.getDeviceId());
- queryDto.setDeviceCode(monitorDataEntity.getDeviceCode());
- return queryDto;
- }
- private ReportWaterPumpStateEntity getReportWaterPumpStateEntity(MonitorDataEntity monitorDataEntity, MonitorDataValueEntity monitorDataValueEntity,
- Date receiveDateTime, Integer state){
- Date nowTime = new Date();
- ReportWaterPumpStateEntity entity = new ReportWaterPumpStateEntity();
- entity.setState(state);
- entity.setTenantId(monitorDataEntity.getTenantId());
- entity.setDeviceId(monitorDataEntity.getDeviceId());
- entity.setDeviceCode(monitorDataEntity.getDeviceCode());
- entity.setDeviceName(monitorDataEntity.getDeviceName());
- entity.setYear(monitorDataEntity.getYear());
- entity.setMonth(monitorDataEntity.getMonth());
- entity.setDay(monitorDataEntity.getDay());
- entity.setAttributeId(monitorDataValueEntity.getAttributeId());
- entity.setAttributeName(monitorDataValueEntity.getAttributeName());
- entity.setStateBeginTime(receiveDateTime);
- entity.setDateCreate(nowTime);
- entity.setDateUpdate(nowTime);
- entity.setMd5Query(entity.getMd5());
- entity.setIp(getIp());
- return entity;
- }
- private int timeDifference(Date beginTime, Date endTime){
- return Integer.parseInt(DatesUtil.formatDate(endTime, "yyyyMMdd")) - Integer.parseInt(DatesUtil.formatDate(beginTime, "yyyyMMdd"));
- }
- private Date parseDate(Date date, String appendTo){
- return DatesUtil.parseDate(DatesUtil.formatDate(date, "yyyy-MM-dd") + " " + appendTo,
- "yyyy-MM-dd HH:mm:ss");
- }
- public String getIp(){
- try {
- if (IP == null) {
- IP = InetAddress.getLocalHost().getHostAddress();
- }
- } catch (UnknownHostException e) {
- e.printStackTrace();
- }
- return IP;
- }
- }
|