|
@@ -8,12 +8,11 @@ import com.huaxu.quartz.job.MonitorDataReportByDayJob;
|
|
|
import com.huaxu.quartz.job.MonitorDataReportByMonthJob;
|
|
|
import com.huaxu.quartz.job.MonitorDataReportByYearJob;
|
|
|
import com.huaxu.quartz.service.JobAndTriggerService;
|
|
|
+import com.huaxu.quartz.service.MonitorReportJobService;
|
|
|
import com.huaxu.service.MonitorDataService;
|
|
|
import com.huaxu.util.ByteArrayUtils;
|
|
|
import com.huaxu.util.RedisUtil;
|
|
|
-import lombok.Data;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.commons.lang3.StringUtils;
|
|
|
import org.springframework.beans.factory.InitializingBean;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
|
import org.springframework.data.domain.Sort;
|
|
@@ -27,17 +26,13 @@ import org.springframework.data.mongodb.core.query.Update;
|
|
|
import org.springframework.stereotype.Service;
|
|
|
|
|
|
import javax.annotation.Resource;
|
|
|
-import java.math.BigDecimal;
|
|
|
-import java.time.Duration;
|
|
|
-import java.time.LocalDate;
|
|
|
import java.time.LocalDateTime;
|
|
|
-import java.time.format.DateTimeFormatter;
|
|
|
import java.util.*;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
|
* @description
|
|
|
- * @auto wangli
|
|
|
+ * @author wangli
|
|
|
* @data 2020-12-01 11:09
|
|
|
*/
|
|
|
@Service
|
|
@@ -56,7 +51,9 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
|
|
|
@Autowired
|
|
|
private JobAndTriggerService jobAndTriggerService;
|
|
|
|
|
|
- //避免同时执行先后顺序造成取数据不准确,时间隔开执行
|
|
|
+ @Autowired
|
|
|
+ private MonitorReportJobService monitorReportJobServcie;
|
|
|
+
|
|
|
@Override
|
|
|
public void afterPropertiesSet() {
|
|
|
saveQrtzTask("0 0 */1 * * ? ","日报生成任务","smsWaterMonitorDataReportByDayJob",MonitorDataReportByDayJob.class.getName());
|
|
@@ -64,9 +61,17 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
|
|
|
saveQrtzTask("0 10 0 1 * ? ","年报生成任务","smsWaterMonitorDataReportByYearJob", MonitorDataReportByYearJob.class.getName());
|
|
|
}
|
|
|
|
|
|
- public void saveQrtzTask(String cron, String jobGroup ,String jobName,String JobClassName) {
|
|
|
- // 1,查询需要批量推送的配置项目并构建定时任务
|
|
|
- // 2,若对应定时任务不存在则创建
|
|
|
+ /***
|
|
|
+ * 1,查询需要批量推送的配置项目并构建定时任务
|
|
|
+ * 2,若对应定时任务不存在则创建
|
|
|
+ * @Author wangli
|
|
|
+ * @param cron :
|
|
|
+ * @param jobGroup :
|
|
|
+ * @param jobName :
|
|
|
+ * @param jobClassName :
|
|
|
+ * @return void
|
|
|
+ **/
|
|
|
+ public void saveQrtzTask(String cron, String jobGroup ,String jobName,String jobClassName) {
|
|
|
QuartzEntity entity = new QuartzEntity();
|
|
|
entity.setJobGroup(jobGroup);
|
|
|
entity.setJobName(jobName);
|
|
@@ -74,23 +79,24 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
|
|
|
boolean exists = jobAndTriggerService.isExists(entity);
|
|
|
if(!exists){
|
|
|
entity.setCronExpression(cron);
|
|
|
- entity.setJobClassName(JobClassName);
|
|
|
+ entity.setJobClassName(jobClassName);
|
|
|
jobAndTriggerService.save(entity);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public MonitorDataEntity save(MonitorDataEntity monitorDataEntity) {
|
|
|
- //缓存数据
|
|
|
+ // 缓存数据
|
|
|
redisUtil.setExpire(("sms_water_"+monitorDataEntity.getDeviceCode()).getBytes(), ByteArrayUtils.objectToBytes(monitorDataEntity).get());
|
|
|
- //把没有数据的属性去掉再保存
|
|
|
+ // 把没有数据的属性去掉再保存
|
|
|
monitorDataEntity.setDataValues(monitorDataEntity.getDataValues().stream().filter(m -> m.getDataValue()!=null).collect(Collectors.toList()));
|
|
|
|
|
|
return mongoTemplate.save(monitorDataEntity);
|
|
|
}
|
|
|
|
|
|
+ @Override
|
|
|
public MonitorDataEntity getDeviceMonitorInfoByDeviceCode(String deviceCode){
|
|
|
- //先取缓存里的数据
|
|
|
+ // 先取缓存里的数据
|
|
|
byte[] bytes = redisUtil.get(("sms_water_"+deviceCode).getBytes());
|
|
|
if(bytes != null && bytes.length>0){
|
|
|
return (MonitorDataEntity)ByteArrayUtils.bytesToObject(bytes).get();
|
|
@@ -106,11 +112,8 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
|
|
|
@Override
|
|
|
public void update(MonitorDataEntity monitorDataEntity) {
|
|
|
Query query = new Query(Criteria.where("id").is(monitorDataEntity.getId()));
|
|
|
-
|
|
|
Update update = new Update();
|
|
|
update.set("deviceName", monitorDataEntity.getDeviceName());
|
|
|
-
|
|
|
-
|
|
|
mongoTemplate.updateFirst(query, update, MonitorDataEntity.class);
|
|
|
}
|
|
|
|
|
@@ -123,8 +126,7 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
|
|
|
|
|
|
@Override
|
|
|
public List<DayReportEntity> getMonitorDataGroupByHour(LocalDateTime dateTime ,List<Integer> deviceIds ){
|
|
|
-
|
|
|
- if(dateTime == null ){
|
|
|
+ if (dateTime == null ){
|
|
|
return null;
|
|
|
}
|
|
|
Criteria criteria = new Criteria();
|
|
@@ -137,9 +139,12 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
|
|
|
}
|
|
|
AggregationOptions aggregationOptions = AggregationOptions.builder().allowDiskUse(true).build();
|
|
|
Aggregation agg = Aggregation.newAggregation(
|
|
|
- Aggregation.match(criteria), //查询条件
|
|
|
- Aggregation.unwind("dataValues"),//将文档中的某一个数组类型字段拆分成多条,每条包含数组中的一个值
|
|
|
- Aggregation.sort(Sort.Direction.ASC, "collectDate"),//在聚合之前对数据进行排序
|
|
|
+ // 查询条件
|
|
|
+ Aggregation.match(criteria),
|
|
|
+ // 将文档中的某一个数组类型字段拆分成多条,每条包含数组中的一个值
|
|
|
+ Aggregation.unwind("dataValues"),
|
|
|
+ // 在聚合之前对数据进行排序
|
|
|
+ Aggregation.sort(Sort.Direction.ASC, "collectDate"),
|
|
|
Aggregation.group("year","month","day","hour","tenantId","deviceId","deviceName","deviceCode","dataValues.attributeId","dataValues.attributeName" )
|
|
|
.min("$dataValues.dataValue").as("minValue")
|
|
|
.max("$dataValues.dataValue").as("maxValue")
|
|
@@ -152,75 +157,25 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
|
|
|
).withOptions(aggregationOptions);
|
|
|
AggregationResults<DayReportEntity> durationData =
|
|
|
mongoTemplate.aggregate(agg, "SMS_MONITOR_DATA", DayReportEntity.class);
|
|
|
- //获取查询结果
|
|
|
- List<DayReportEntity> monthReportEntities = durationData.getMappedResults();
|
|
|
- return monthReportEntities;
|
|
|
+ return durationData.getMappedResults();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void getMonitorDataReportByHour(){
|
|
|
- /* LocalDateTime dateTime =
|
|
|
- LocalDateTime.parse("2021-04-19 14:59:58", DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
|
|
|
-*/
|
|
|
- //取前一个小时的时间
|
|
|
+ // 取前一个小时的时间
|
|
|
LocalDateTime dateTime = LocalDateTime.now().plusHours(-1);
|
|
|
-
|
|
|
// 取前一个小时的时间(取19点) sumValue -> 1-10,2-20
|
|
|
- List<DayReportEntity> hourDatas = getMonitorDataGroupByHour(dateTime,null);
|
|
|
+ List<DayReportEntity> hourData = getMonitorDataGroupByHour(dateTime,null);
|
|
|
// 取前一个小时的时间(取18点)
|
|
|
- List<DayReportEntity> lastHourDatas = getMonitorDataGroupByHour(dateTime.plusHours(-1),null);
|
|
|
-
|
|
|
- Map<String,DayReportEntity> lastHourDataMap = lastHourDatas.stream().collect(Collectors.toMap(DayReportEntity::getMapkey, a -> a,(k1, k2)->k1));
|
|
|
- //保存日报表数据
|
|
|
- saveReportDataByHour(hourDatas, lastHourDataMap);
|
|
|
-
|
|
|
- //补数据,前24小时没有统计数据的
|
|
|
- List<Integer> deviceIds ;
|
|
|
- if(lastHourDataMap.values().size()>0){
|
|
|
- for(int i=1;i<24;i++){
|
|
|
- dateTime = dateTime.plusHours(-1);
|
|
|
- //设备id
|
|
|
- deviceIds = lastHourDataMap.values().stream().map(d -> d.getDeviceId().intValue()).distinct().collect(Collectors.toList());
|
|
|
- if(deviceIds.size() ==0){break;}
|
|
|
-
|
|
|
- //前1小时有统计数据的设备id
|
|
|
- List<Integer> deviceIdsIsExit = monitorDataMapper.checkReportDataExit(dateTime.getYear(),dateTime.getMonthValue(),dateTime.getDayOfMonth(),dateTime.getHour(),deviceIds);
|
|
|
- //需要补充数据的设备
|
|
|
- deviceIds = deviceIds.stream().filter(deviceId -> !deviceIdsIsExit.contains(deviceId)).collect(Collectors.toList());
|
|
|
- if(deviceIds.size() ==0){break;}
|
|
|
-
|
|
|
- //查询需要补充数据的设备数据信息
|
|
|
- hourDatas = lastHourDataMap.values().stream().filter(d -> !deviceIdsIsExit.contains(d.getDeviceId().intValue())).collect(Collectors.toList());
|
|
|
-
|
|
|
- lastHourDatas = getMonitorDataGroupByHour(dateTime.plusHours(-1), deviceIds);
|
|
|
- lastHourDataMap = lastHourDatas.stream().collect(Collectors.toMap(DayReportEntity::getMapkey, a -> a, (k1, k2) -> k1));
|
|
|
- //保存日报表数据
|
|
|
- saveReportDataByHour(hourDatas, lastHourDataMap);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- }
|
|
|
- //保存日报表数据
|
|
|
- private void saveReportDataByHour(List<DayReportEntity> hourDatas, Map<String, DayReportEntity> lastHourDataMap) {
|
|
|
- if (hourDatas.size() > 0) {
|
|
|
- //计算累计值
|
|
|
- for (DayReportEntity dayReportEntity : hourDatas) {
|
|
|
- if (dayReportEntity.getLatestValue() != null && lastHourDataMap.containsKey(dayReportEntity.getMapkey())) {
|
|
|
- //上一个小时有值
|
|
|
- if (lastHourDataMap.get(dayReportEntity.getMapkey()).getLatestValue() != null) {
|
|
|
- dayReportEntity.setSumValue(new BigDecimal(dayReportEntity.getLatestValue().toString()).subtract(new BigDecimal(lastHourDataMap.get(dayReportEntity.getMapkey()).getLatestValue().toString())).doubleValue());
|
|
|
- } else {
|
|
|
- if (dayReportEntity.getFirstValue() != null) {//上一个小时没有值,取本小时的初始值
|
|
|
- dayReportEntity.setSumValue(new BigDecimal(dayReportEntity.getLatestValue().toString()).subtract(new BigDecimal(dayReportEntity.getFirstValue().toString())).doubleValue());
|
|
|
- }
|
|
|
- lastHourDataMap.remove(dayReportEntity.getMapkey());
|
|
|
- }
|
|
|
- }
|
|
|
- //对平均值小数位处理
|
|
|
- dayReportEntity.setAvgValue(new BigDecimal(dayReportEntity.getAvgValue()).setScale(5, BigDecimal.ROUND_HALF_UP).doubleValue());
|
|
|
- }
|
|
|
- monitorDataMapper.batchInsertDayReport(hourDatas);
|
|
|
+ List<DayReportEntity> lastHourData = getMonitorDataGroupByHour(dateTime.plusHours(-1),null);
|
|
|
+ Map<String,DayReportEntity> lastHourDataMap = lastHourData.stream().collect(Collectors.toMap(DayReportEntity::getMapkey, a -> a,(k1, k2)->k1));
|
|
|
+ // 保存日报表数据
|
|
|
+ monitorReportJobServcie.saveReportDataByHour(hourData, lastHourDataMap);
|
|
|
+ if (lastHourDataMap.values().size() == 0){
|
|
|
+ return;
|
|
|
}
|
|
|
+ // 补充数据
|
|
|
+ monitorReportJobServcie.replenishHistoryData(dateTime, lastHourDataMap);
|
|
|
}
|
|
|
|
|
|
@Override
|
|
@@ -239,4 +194,6 @@ public class MonitorDataServiceImpl implements MonitorDataService , Initializing
|
|
|
monitorDataMapper.batchInsertYearReport(dateTime.getYear(),dateTime.getMonthValue());
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+
|
|
|
}
|