Jelajahi Sumber

Merge branch '20210223' of http://114.135.61.188:53000/UIMS/Code into 20210223

wangbo 4 tahun lalu
induk
melakukan
f40cdd5b0b

+ 4 - 2
operation_manager/src/main/java/com/huaxu/utils/EvaluationUtil.java

@@ -36,10 +36,11 @@ public class EvaluationUtil {
     }
 
     public static BigDecimal divide(Integer completeCount, Integer total){
+        String defaultValue = "0";
         BigDecimal bigDecimalComplete =  new BigDecimal(completeCount.toString());
         BigDecimal bigDecimalTotal =  new BigDecimal(total.toString());
-        if (bigDecimalTotal.compareTo(new BigDecimal("0")) == 0){
-            return new BigDecimal("0");
+        if (bigDecimalTotal.compareTo(new BigDecimal(defaultValue)) == 0){
+            return new BigDecimal(defaultValue);
         }
         return bigDecimalComplete.divide(bigDecimalTotal,2, RoundingMode.HALF_UP);
     }
@@ -164,6 +165,7 @@ public class EvaluationUtil {
                 return 2;
             case 10 :
                 return 3;
+            default: ;
         }
         return null;
     }

+ 111 - 0
sms_water/src/main/java/com/huaxu/quartz/service/MonitorReportJobService.java

@@ -0,0 +1,111 @@
+package com.huaxu.quartz.service;
+
+import com.huaxu.dao.MonitorDataMapper;
+import com.huaxu.entity.DayReportEntity;
+import com.huaxu.service.MonitorDataService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.scheduling.annotation.Async;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * 报表业务处理
+ * @author lihui
+ * @version V1.0
+ * @date 2021/5/18
+ **/
+@Component
+@Slf4j
+public class MonitorReportJobService {
+
+    @Resource
+    private MonitorDataService monitorDataService;
+
+    @Resource
+    private MonitorDataMapper monitorDataMapper;
+
+    /***
+     * 数据补充
+     * @Author lihui
+     * @Date 9:55 2021/5/18
+     * @param dateTime :   时间
+     * @param lastHourDataMap : 上一个小时的数据
+     * @return void
+     **/
+    public void replenishHistoryData(LocalDateTime dateTime, Map<String,DayReportEntity> lastHourDataMap){
+        List<Integer> deviceIds = null;
+        List<DayReportEntity> hourData = null;
+        List<DayReportEntity> lastHourData = null;
+        int total = 1;
+        // 最多向前补一个月的数据
+        int maxHour = 24 * 30;
+        while (true) {
+            if (total >= maxHour) {
+                break;
+            }
+            dateTime = dateTime.plusHours(-1);
+            // 设备id
+            deviceIds = lastHourDataMap.values().stream().map(d -> d.getDeviceId().intValue()).distinct().collect(Collectors.toList());
+            if (deviceIds.size() ==0) {
+                break;
+            }
+            // 前1小时有统计数据的设备id
+            List<Integer> deviceIdsIsExit = monitorDataMapper.checkReportDataExit(dateTime.getYear(),dateTime.getMonthValue(),dateTime.getDayOfMonth(),
+                    dateTime.getHour(),deviceIds);
+            // 需要补充数据的设备
+            deviceIds = deviceIds.stream().filter(deviceId -> !deviceIdsIsExit.contains(deviceId)).collect(Collectors.toList());
+            if (deviceIds.size() == 0) {
+                break;
+            }
+            // 查询需要补充数据的设备数据信息
+            hourData = lastHourDataMap.values().stream().filter(d -> !deviceIdsIsExit.contains(d.getDeviceId().intValue())).collect(Collectors.toList());
+            lastHourData   = monitorDataService.getMonitorDataGroupByHour(dateTime.plusHours(-1), deviceIds);
+            lastHourDataMap = lastHourData.stream().collect(Collectors.toMap(DayReportEntity::getMapkey, a -> a, (k1, k2) -> k1));
+            // 保存日报表数据
+            saveReportDataByHour(hourData, lastHourDataMap);
+            total++;
+        }
+
+    }
+
+    /***
+     * 数据保存
+     * @Author lihui
+     * @Date 9:55 2021/5/18
+     * @param hourData :
+     * @param lastHourDataMap :
+     * @return void
+     **/
+    public void saveReportDataByHour(List<DayReportEntity> hourData, Map<String, DayReportEntity> lastHourDataMap) {
+        if (hourData.size() == 0) {
+            return;
+        }
+        // 计算累计值
+        for (DayReportEntity dayReportEntity : hourData) {
+            if (dayReportEntity.getLatestValue() != null && lastHourDataMap.containsKey(dayReportEntity.getMapkey())) {
+                // 上一个小时有值
+                if (lastHourDataMap.get(dayReportEntity.getMapkey()).getLatestValue() != null) {
+                    dayReportEntity.setSumValue(new BigDecimal(dayReportEntity.getLatestValue().toString()).subtract(new BigDecimal(lastHourDataMap.get(dayReportEntity.getMapkey()).getLatestValue().toString())).doubleValue());
+                } else {
+                    // 上一个小时没有值,取本小时的初始值
+                    if (dayReportEntity.getFirstValue() != null) {
+                        dayReportEntity.setSumValue(new BigDecimal(dayReportEntity.getLatestValue().toString()).subtract(new BigDecimal(dayReportEntity.getFirstValue().toString())).doubleValue());
+                    }
+                    lastHourDataMap.remove(dayReportEntity.getMapkey());
+                }
+            }
+            // 对平均值小数位处理
+            dayReportEntity.setAvgValue(new BigDecimal(dayReportEntity.getAvgValue()).setScale(5, BigDecimal.ROUND_HALF_UP).doubleValue());
+        }
+        monitorDataMapper.batchInsertDayReport(hourData);
+    }
+
+
+}

+ 2 - 5
sms_water/src/main/java/com/huaxu/rabbitmq/ReceiveClearData.java

@@ -76,9 +76,10 @@ public class ReceiveClearData {
     }
 
     public void receivedDataHandle(byte[] receivedData){
-        long begin = System.currentTimeMillis();
         JSONObject jsonObject = JSONObject.parseObject(new String(receivedData));
         String eventTime      = jsonObject.getString("eventTime");
+        String deviceCode     = jsonObject.getString("unitIdentifier");
+        log.info("【数据采集】deviceCode : {},时间: {}", deviceCode ,eventTime);
         // 对象有时间、有设备编码、有数据则解析,任何一个位空直接退出
         if (!jsonObject.containsKey("eventTime") || !jsonObject.containsKey("unitIdentifier") ||
                 !jsonObject.containsKey("parsedData")) {
@@ -89,8 +90,6 @@ public class ReceiveClearData {
             log.error("报警信息查询时间转换错误,原数据:eventTime:{}", eventTime);
             return;
         }
-
-        String deviceCode      = jsonObject.getString("unitIdentifier");
         JSONObject receiveData = JSONObject.parseObject(jsonObject .getString("parsedData"));
         if (StringUtils.isBlank(deviceCode)) {
             log.error("deviceCode为空退出。");
@@ -131,8 +130,6 @@ public class ReceiveClearData {
         reportWaterPumpStateHandler.handler(monitorDataEntity, receiveData, receiveDateTime);
         // 异步处理报警信息
         alarmDataHandler.hanlder(monitorDataEntity, receiveData, receiveDateTime);
-        long end = System.currentTimeMillis();
-        log.info("rabbitmq队列消息处理完成,耗时:" + (end-begin) + "毫秒");
     }
 
 

+ 41 - 84
sms_water/src/main/java/com/huaxu/service/impl/MonitorDataServiceImpl.java

@@ -8,12 +8,11 @@ import com.huaxu.quartz.job.MonitorDataReportByDayJob;
 import com.huaxu.quartz.job.MonitorDataReportByMonthJob;
 import com.huaxu.quartz.job.MonitorDataReportByYearJob;
 import com.huaxu.quartz.service.JobAndTriggerService;
+import com.huaxu.quartz.service.MonitorReportJobService;
 import com.huaxu.service.MonitorDataService;
 import com.huaxu.util.ByteArrayUtils;
 import com.huaxu.util.RedisUtil;
-import lombok.Data;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.commons.lang3.StringUtils;
 import org.springframework.beans.factory.InitializingBean;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.data.domain.Sort;
@@ -27,17 +26,13 @@ import org.springframework.data.mongodb.core.query.Update;
 import org.springframework.stereotype.Service;
 
 import javax.annotation.Resource;
-import java.math.BigDecimal;
-import java.time.Duration;
-import java.time.LocalDate;
 import java.time.LocalDateTime;
-import java.time.format.DateTimeFormatter;
 import java.util.*;
 import java.util.stream.Collectors;
 
 /**
  * @description
- * @auto wangli
+ * @author  wangli
  * @data 2020-12-01 11:09
  */
 @Service
@@ -56,7 +51,9 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
     @Autowired
     private JobAndTriggerService jobAndTriggerService;
 
-    //避免同时执行先后顺序造成取数据不准确,时间隔开执行
+    @Autowired
+    private MonitorReportJobService monitorReportJobServcie;
+
     @Override
     public void afterPropertiesSet() {
         saveQrtzTask("0 0 */1 * * ? ","日报生成任务","smsWaterMonitorDataReportByDayJob",MonitorDataReportByDayJob.class.getName());
@@ -64,9 +61,17 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
         saveQrtzTask("0 10 0 1 * ? ","年报生成任务","smsWaterMonitorDataReportByYearJob", MonitorDataReportByYearJob.class.getName());
     }
 
-    public void saveQrtzTask(String cron, String jobGroup ,String jobName,String JobClassName) {
-        // 1,查询需要批量推送的配置项目并构建定时任务
-        // 2,若对应定时任务不存在则创建
+    /***
+     * 1,查询需要批量推送的配置项目并构建定时任务
+     * 2,若对应定时任务不存在则创建
+     * @Author wangli
+     * @param cron :
+     * @param jobGroup :
+     * @param jobName :
+     * @param jobClassName :
+     * @return void
+     **/
+    public void saveQrtzTask(String cron, String jobGroup ,String jobName,String jobClassName) {
         QuartzEntity entity = new QuartzEntity();
         entity.setJobGroup(jobGroup);
         entity.setJobName(jobName);
@@ -74,23 +79,24 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
         boolean exists = jobAndTriggerService.isExists(entity);
         if(!exists){
             entity.setCronExpression(cron);
-            entity.setJobClassName(JobClassName);
+            entity.setJobClassName(jobClassName);
             jobAndTriggerService.save(entity);
         }
     }
 
     @Override
     public MonitorDataEntity save(MonitorDataEntity monitorDataEntity) {
-        //缓存数据
+        // 缓存数据
         redisUtil.setExpire(("sms_water_"+monitorDataEntity.getDeviceCode()).getBytes(), ByteArrayUtils.objectToBytes(monitorDataEntity).get());
-        //把没有数据的属性去掉再保存
+        // 把没有数据的属性去掉再保存
         monitorDataEntity.setDataValues(monitorDataEntity.getDataValues().stream().filter(m -> m.getDataValue()!=null).collect(Collectors.toList()));
 
         return mongoTemplate.save(monitorDataEntity);
     }
 
+    @Override
     public MonitorDataEntity getDeviceMonitorInfoByDeviceCode(String deviceCode){
-        //先取缓存里的数据
+        // 先取缓存里的数据
         byte[] bytes = redisUtil.get(("sms_water_"+deviceCode).getBytes());
         if(bytes != null && bytes.length>0){
             return (MonitorDataEntity)ByteArrayUtils.bytesToObject(bytes).get();
@@ -106,11 +112,8 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
     @Override
     public void update(MonitorDataEntity monitorDataEntity) {
         Query query = new Query(Criteria.where("id").is(monitorDataEntity.getId()));
-
         Update update = new Update();
         update.set("deviceName", monitorDataEntity.getDeviceName());
-
-
         mongoTemplate.updateFirst(query, update, MonitorDataEntity.class);
     }
 
@@ -123,8 +126,7 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
 
     @Override
     public List<DayReportEntity> getMonitorDataGroupByHour(LocalDateTime dateTime ,List<Integer> deviceIds ){
-
-        if(dateTime == null ){
+        if (dateTime == null ){
             return null;
         }
         Criteria criteria = new Criteria();
@@ -137,9 +139,12 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
         }
         AggregationOptions aggregationOptions = AggregationOptions.builder().allowDiskUse(true).build();
         Aggregation agg = Aggregation.newAggregation(
-                Aggregation.match(criteria),    //查询条件
-                Aggregation.unwind("dataValues"),//将文档中的某一个数组类型字段拆分成多条,每条包含数组中的一个值
-                Aggregation.sort(Sort.Direction.ASC, "collectDate"),//在聚合之前对数据进行排序
+                // 查询条件
+                Aggregation.match(criteria),
+                // 将文档中的某一个数组类型字段拆分成多条,每条包含数组中的一个值
+                Aggregation.unwind("dataValues"),
+                // 在聚合之前对数据进行排序
+                Aggregation.sort(Sort.Direction.ASC, "collectDate"),
                 Aggregation.group("year","month","day","hour","tenantId","deviceId","deviceName","deviceCode","dataValues.attributeId","dataValues.attributeName" )
                         .min("$dataValues.dataValue").as("minValue")
                         .max("$dataValues.dataValue").as("maxValue")
@@ -152,75 +157,25 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
         ).withOptions(aggregationOptions);
         AggregationResults<DayReportEntity> durationData =
                 mongoTemplate.aggregate(agg, "SMS_MONITOR_DATA", DayReportEntity.class);
-        //获取查询结果
-        List<DayReportEntity> monthReportEntities = durationData.getMappedResults();
-        return monthReportEntities;
+        return durationData.getMappedResults();
     }
 
     @Override
     public void getMonitorDataReportByHour(){
-    /*    LocalDateTime dateTime =
-                LocalDateTime.parse("2021-04-19 14:59:58", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
-*/
-        //取前一个小时的时间
+        // 取前一个小时的时间
         LocalDateTime dateTime = LocalDateTime.now().plusHours(-1);
-
         // 取前一个小时的时间(取19点) sumValue  -> 1-10,2-20
-        List<DayReportEntity> hourDatas = getMonitorDataGroupByHour(dateTime,null);
+        List<DayReportEntity> hourData     = getMonitorDataGroupByHour(dateTime,null);
         // 取前一个小时的时间(取18点)
-        List<DayReportEntity> lastHourDatas = getMonitorDataGroupByHour(dateTime.plusHours(-1),null);
-
-        Map<String,DayReportEntity> lastHourDataMap = lastHourDatas.stream().collect(Collectors.toMap(DayReportEntity::getMapkey, a -> a,(k1, k2)->k1));
-        //保存日报表数据
-        saveReportDataByHour(hourDatas, lastHourDataMap);
-
-        //补数据,前24小时没有统计数据的
-        List<Integer> deviceIds ;
-        if(lastHourDataMap.values().size()>0){
-            for(int i=1;i<24;i++){
-                dateTime = dateTime.plusHours(-1);
-                //设备id
-                deviceIds = lastHourDataMap.values().stream().map(d -> d.getDeviceId().intValue()).distinct().collect(Collectors.toList());
-                if(deviceIds.size() ==0){break;}
-
-                //前1小时有统计数据的设备id
-                List<Integer> deviceIdsIsExit = monitorDataMapper.checkReportDataExit(dateTime.getYear(),dateTime.getMonthValue(),dateTime.getDayOfMonth(),dateTime.getHour(),deviceIds);
-                //需要补充数据的设备
-                deviceIds = deviceIds.stream().filter(deviceId -> !deviceIdsIsExit.contains(deviceId)).collect(Collectors.toList());
-                if(deviceIds.size() ==0){break;}
-
-                //查询需要补充数据的设备数据信息
-                hourDatas = lastHourDataMap.values().stream().filter(d -> !deviceIdsIsExit.contains(d.getDeviceId().intValue())).collect(Collectors.toList());
-
-                lastHourDatas = getMonitorDataGroupByHour(dateTime.plusHours(-1), deviceIds);
-                lastHourDataMap = lastHourDatas.stream().collect(Collectors.toMap(DayReportEntity::getMapkey, a -> a, (k1, k2) -> k1));
-                //保存日报表数据
-                saveReportDataByHour(hourDatas, lastHourDataMap);
-            }
-        }
-
-    }
-    //保存日报表数据
-    private void saveReportDataByHour(List<DayReportEntity> hourDatas, Map<String, DayReportEntity> lastHourDataMap) {
-        if (hourDatas.size() > 0) {
-            //计算累计值
-            for (DayReportEntity dayReportEntity : hourDatas) {
-                if (dayReportEntity.getLatestValue() != null && lastHourDataMap.containsKey(dayReportEntity.getMapkey())) {
-                    //上一个小时有值
-                    if (lastHourDataMap.get(dayReportEntity.getMapkey()).getLatestValue() != null) {
-                        dayReportEntity.setSumValue(new BigDecimal(dayReportEntity.getLatestValue().toString()).subtract(new BigDecimal(lastHourDataMap.get(dayReportEntity.getMapkey()).getLatestValue().toString())).doubleValue());
-                    } else {
-                        if (dayReportEntity.getFirstValue() != null) {//上一个小时没有值,取本小时的初始值
-                            dayReportEntity.setSumValue(new BigDecimal(dayReportEntity.getLatestValue().toString()).subtract(new BigDecimal(dayReportEntity.getFirstValue().toString())).doubleValue());
-                        }
-                        lastHourDataMap.remove(dayReportEntity.getMapkey());
-                    }
-                }
-                //对平均值小数位处理
-                dayReportEntity.setAvgValue(new BigDecimal(dayReportEntity.getAvgValue()).setScale(5, BigDecimal.ROUND_HALF_UP).doubleValue());
-            }
-            monitorDataMapper.batchInsertDayReport(hourDatas);
+        List<DayReportEntity> lastHourData = getMonitorDataGroupByHour(dateTime.plusHours(-1),null);
+        Map<String,DayReportEntity> lastHourDataMap = lastHourData.stream().collect(Collectors.toMap(DayReportEntity::getMapkey, a -> a,(k1, k2)->k1));
+        // 保存日报表数据
+        monitorReportJobServcie.saveReportDataByHour(hourData, lastHourDataMap);
+        if (lastHourDataMap.values().size() == 0){
+            return;
         }
+        // 补充数据
+        monitorReportJobServcie.replenishHistoryData(dateTime, lastHourDataMap);
     }
 
     @Override
@@ -239,4 +194,6 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
         monitorDataMapper.batchInsertYearReport(dateTime.getYear(),dateTime.getMonthValue());
     }
 
+
+
 }