|
@@ -0,0 +1,242 @@
|
|
|
+package com.zcxk.job.service.impl;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSONObject;
|
|
|
+import com.xxl.job.core.log.XxlJobLogger;
|
|
|
+import com.zcxk.job.dao.StatMeterReadRateMapper;
|
|
|
+import com.zcxk.job.dto.ReadRateDTO;
|
|
|
+import com.zcxk.job.service.WaterMeterReadRateService;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.data.mongodb.core.MongoTemplate;
|
|
|
+import org.springframework.data.mongodb.core.aggregation.Aggregation;
|
|
|
+import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
|
|
|
+import org.springframework.data.mongodb.core.aggregation.AggregationResults;
|
|
|
+import org.springframework.data.mongodb.core.aggregation.ConditionalOperators;
|
|
|
+import org.springframework.data.mongodb.core.query.Criteria;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.time.LocalDateTime;
|
|
|
+import java.time.format.DateTimeFormatter;
|
|
|
+import java.time.temporal.TemporalAdjusters;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.List;
|
|
|
+
|
|
|
+
|
|
|
+@Service
|
|
|
+public class WaterMeterReadRateServiceImpl implements WaterMeterReadRateService {
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ private MongoTemplate mongoTemplate;
|
|
|
+ @Autowired
|
|
|
+ private StatMeterReadRateMapper statMeterReadRateMapper;
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void yesterdayReadRateJobHandler(String param) {
|
|
|
+ JSONObject obj = JSONObject.parseObject(param);
|
|
|
+ XxlJobLogger.log("obj:{}", obj);
|
|
|
+ Integer statDay = 0;
|
|
|
+ Integer startDate = 0;
|
|
|
+ Integer endDate = 0;
|
|
|
+ if (obj != null) {
|
|
|
+ statDay = obj.getInteger("statDay");
|
|
|
+ startDate = obj.getInteger("startDate");
|
|
|
+ endDate = obj.getInteger("endDate");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (statDay == null || statDay == 0) {
|
|
|
+ statDay = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ if (startDate == null || startDate == 0) {
|
|
|
+ startDate = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ if (endDate == null || endDate == 0) {
|
|
|
+ endDate = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ XxlJobLogger.log("statDay:{},startDate:{},endDate:{}",statDay, startDate, endDate);
|
|
|
+
|
|
|
+ List<ReadRateDTO> results = this.readRateJobHandler(statDay,startDate,endDate);
|
|
|
+ if (results != null && results.size() > 0) {
|
|
|
+ XxlJobLogger.log("结果数量,size:{}", results.size());
|
|
|
+ statMeterReadRateMapper.batchInsert("sc_stat_meter_read_rate_by_building",results);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void recent7daysReadRateJobHandler(String param) {
|
|
|
+ JSONObject obj = JSONObject.parseObject(param);
|
|
|
+ XxlJobLogger.log("obj:{}", obj);
|
|
|
+ Integer statDay = 0;
|
|
|
+ Integer startDate = 0;
|
|
|
+ Integer endDate = 0;
|
|
|
+ if (obj != null) {
|
|
|
+ statDay = obj.getInteger("statDay");
|
|
|
+ startDate = obj.getInteger("startDate");
|
|
|
+ endDate = obj.getInteger("endDate");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (statDay == null || statDay == 0) {
|
|
|
+ statDay = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ if (startDate == null || startDate == 0) {
|
|
|
+ startDate = Integer.valueOf(LocalDateTime.now().plusDays(-7).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ if (endDate == null || endDate == 0) {
|
|
|
+ endDate = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ XxlJobLogger.log("statDay:{},startDate:{},endDate:{}",statDay, startDate, endDate);
|
|
|
+
|
|
|
+ List<ReadRateDTO> results = this.readRateJobHandler(statDay,startDate,endDate);
|
|
|
+ if (results != null && results.size() > 0) {
|
|
|
+ XxlJobLogger.log("结果数量,size:{}", results.size());
|
|
|
+ statMeterReadRateMapper.batchInsert("sc_stat_meter_read_rate_by_building_7day",results);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void recent15daysReadRateJobHandler(String param) {
|
|
|
+ JSONObject obj = JSONObject.parseObject(param);
|
|
|
+ XxlJobLogger.log("obj:{}", obj);
|
|
|
+ Integer statDay = 0;
|
|
|
+ Integer startDate = 0;
|
|
|
+ Integer endDate = 0;
|
|
|
+ if (obj != null) {
|
|
|
+ statDay = obj.getInteger("statDay");
|
|
|
+ startDate = obj.getInteger("startDate");
|
|
|
+ endDate = obj.getInteger("endDate");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (statDay == null || statDay == 0) {
|
|
|
+ statDay = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ if (startDate == null || startDate == 0) {
|
|
|
+ startDate = Integer.valueOf(LocalDateTime.now().plusDays(-15).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ if (endDate == null || endDate == 0) {
|
|
|
+ endDate = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ XxlJobLogger.log("statDay:{},startDate:{},endDate:{}",statDay, startDate, endDate);
|
|
|
+
|
|
|
+ List<ReadRateDTO> results = this.readRateJobHandler(statDay,startDate,endDate);
|
|
|
+ if (results != null && results.size() > 0) {
|
|
|
+ XxlJobLogger.log("结果数量,size:{}", results.size());
|
|
|
+ statMeterReadRateMapper.batchInsert("sc_stat_meter_read_rate_by_building_15day",results);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void lastMonthReadRateJobHandler(String param) {
|
|
|
+ JSONObject obj = JSONObject.parseObject(param);
|
|
|
+ XxlJobLogger.log("obj:{}", obj);
|
|
|
+ Integer statDay = 0;
|
|
|
+ Integer startDate = 0;
|
|
|
+ Integer endDate = 0;
|
|
|
+ if (obj != null) {
|
|
|
+ statDay = obj.getInteger("statDay");
|
|
|
+ startDate = obj.getInteger("startDate");
|
|
|
+ endDate = obj.getInteger("endDate");
|
|
|
+ }
|
|
|
+
|
|
|
+ if (statDay == null || statDay == 0) {
|
|
|
+ statDay = Integer.valueOf(LocalDateTime.now().plusMonths(-1).format(DateTimeFormatter.ofPattern("yyyyMM")));
|
|
|
+ }
|
|
|
+ if (startDate == null || startDate == 0) {
|
|
|
+ startDate = Integer.valueOf(LocalDateTime.now().plusMonths(-1).with(TemporalAdjusters.firstDayOfMonth()).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ if (endDate == null || endDate == 0) {
|
|
|
+ endDate = Integer.valueOf(LocalDateTime.now().plusMonths(-1).with(TemporalAdjusters.lastDayOfMonth()).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
|
|
|
+ }
|
|
|
+ XxlJobLogger.log("statDay:{},startDate:{},endDate:{}",statDay, startDate, endDate);
|
|
|
+
|
|
|
+ List<ReadRateDTO> results = this.readRateJobHandler(statDay,startDate,endDate);
|
|
|
+ if (results != null && results.size() > 0) {
|
|
|
+ XxlJobLogger.log("结果数量,size:{}", results.size());
|
|
|
+ statMeterReadRateMapper.batchInsert("sc_stat_meter_read_rate_by_building_month",results);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ protected List<ReadRateDTO> readRateJobHandler(Integer statDay, Integer startDate, Integer endDate){
|
|
|
+ /*
|
|
|
+ db.sc_meter_read_record.aggregate(
|
|
|
+ [
|
|
|
+ {$match:{status:1,readDate:20200619}},
|
|
|
+ {$group:{_id:{
|
|
|
+ "siteId": "$siteId","channelId": "$channelId","customerId": "$customerId","buildingId": "$buildingId","deviceId": "$deviceId",
|
|
|
+ },
|
|
|
+ realReadTimes:{$sum:{
|
|
|
+ $cond: { if: { $eq: ["$readStatus", "1"] }, then: 1, else: 0 }
|
|
|
+ }}
|
|
|
+ }},
|
|
|
+ {$group:{_id:{
|
|
|
+ "siteId": "$_id.siteId","channelId": "$_id.channelId","customerId": "$_id.customerId","buildingId": "$_id.buildingId",
|
|
|
+ },
|
|
|
+ total:{$sum:1},
|
|
|
+ realReadTimes:{$sum:{
|
|
|
+ $cond: { if: { $ne: ["$realReadTimes", 0] }, then: 1, else: 0 }
|
|
|
+ }},
|
|
|
+ unReadTimes:{$sum:{
|
|
|
+ $cond: { if: { $eq: ["$realReadTimes", 0] }, then: 1, else: 0 }
|
|
|
+ }}
|
|
|
+ }},
|
|
|
+ { $project : {
|
|
|
+ siteId : 1 ,
|
|
|
+ channelId : 1 ,
|
|
|
+ customerId : 1 ,
|
|
|
+ buildingId : 1 ,
|
|
|
+ total : 1 ,
|
|
|
+ realReadTimes : 1 ,
|
|
|
+ unReadTimes : 1 ,
|
|
|
+ readRate : { $divide: [ "$realReadTimes", "$total" ] }
|
|
|
+ }}
|
|
|
+ ]
|
|
|
+ ,{ allowDiskUse: true }
|
|
|
+ )
|
|
|
+ */
|
|
|
+
|
|
|
+
|
|
|
+ List<AggregationOperation> operations = new ArrayList<>();
|
|
|
+ operations.add(Aggregation.match(Criteria.where("status").is(1)));
|
|
|
+ operations.add(Aggregation.match(Criteria.where("readDate").gte(startDate).lte(endDate)));
|
|
|
+ // 1,添加分组条件
|
|
|
+ ConditionalOperators.Cond readTimesCond = ConditionalOperators.when(new Criteria("readStatus").is("1")).then(1).otherwise(0);
|
|
|
+ operations.add(Aggregation.group("siteId","channelId","customerId","buildingId","deviceId")
|
|
|
+ .sum(readTimesCond).as("realReadTimes"));
|
|
|
+ // 2,添加分组条件
|
|
|
+ ConditionalOperators.Cond readTimesCond1 = ConditionalOperators.when(new Criteria("realReadTimes").ne(0)).then(1).otherwise(0);
|
|
|
+ ConditionalOperators.Cond unReadTimesCond1= ConditionalOperators.when(new Criteria("realReadTimes").is(0)).then(1).otherwise(0);
|
|
|
+ operations.add(Aggregation.group("siteId","channelId","customerId","buildingId")
|
|
|
+ .sum(readTimesCond1).as("realReadTimes")
|
|
|
+ .sum(unReadTimesCond1).as("unReadTimes")
|
|
|
+ .count().as("total"));
|
|
|
+
|
|
|
+ operations.add(Aggregation.project("realReadTimes","unReadTimes").
|
|
|
+ and("_id.siteId").as("siteId").
|
|
|
+ and("_id.channelId").as("channelId").
|
|
|
+ and("_id.customerId").as("customerId").
|
|
|
+ and("_id.buildingId").as("buildingId").
|
|
|
+ and("total").as("deviceCount").
|
|
|
+ //and("readTimes").divide("total").as("readRate").
|
|
|
+ andExclude("_id"));
|
|
|
+ //operations.add(Aggregation.sort(Sort.Direction.DESC, "total"));
|
|
|
+
|
|
|
+ // 3,聚合查询所有信息
|
|
|
+ Aggregation aggregation = Aggregation.newAggregation(operations);
|
|
|
+ // 4,查询结果
|
|
|
+ AggregationResults<ReadRateDTO> aggregationResults = mongoTemplate.aggregate(aggregation, "sc_meter_read_record", ReadRateDTO.class);
|
|
|
+ // 5,获取结果
|
|
|
+ List<ReadRateDTO> results = aggregationResults.getMappedResults();
|
|
|
+ if (results != null && results.size() > 0) {
|
|
|
+ for (ReadRateDTO result : results) {
|
|
|
+ if (result.getDeviceCount() != 0) {
|
|
|
+ BigDecimal realReadTimes = new BigDecimal(result.getRealReadTimes()* 100);
|
|
|
+ BigDecimal total = new BigDecimal(result.getDeviceCount());
|
|
|
+ result.setReadRate(realReadTimes.divide(total,2,BigDecimal.ROUND_HALF_UP).doubleValue());
|
|
|
+ }
|
|
|
+ result.setStatDay(statDay);
|
|
|
+ result.setReadTimes(result.getDeviceCount());
|
|
|
+ result.setDateCreate(LocalDateTime.now());
|
|
|
+ result.setDateUpdate(LocalDateTime.now());
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return results;
|
|
|
+ }
|
|
|
+}
|