|  | @@ -0,0 +1,218 @@
 | 
	
		
			
				|  |  | +package com.zcxk.job.service.impl;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import com.alibaba.fastjson.JSONObject;
 | 
	
		
			
				|  |  | +import com.xxl.job.core.log.XxlJobLogger;
 | 
	
		
			
				|  |  | +import com.zcxk.job.dao.StatMeterUnreadDeviceMapper;
 | 
	
		
			
				|  |  | +import com.zcxk.job.entity.StatMeterUnreadDevice;
 | 
	
		
			
				|  |  | +import com.zcxk.job.service.WaterMeterUnreadDeviceService;
 | 
	
		
			
				|  |  | +import lombok.extern.slf4j.Slf4j;
 | 
	
		
			
				|  |  | +import org.springframework.beans.factory.annotation.Autowired;
 | 
	
		
			
				|  |  | +import org.springframework.data.mongodb.core.MongoTemplate;
 | 
	
		
			
				|  |  | +import org.springframework.data.mongodb.core.aggregation.Aggregation;
 | 
	
		
			
				|  |  | +import org.springframework.data.mongodb.core.aggregation.AggregationOperation;
 | 
	
		
			
				|  |  | +import org.springframework.data.mongodb.core.aggregation.AggregationResults;
 | 
	
		
			
				|  |  | +import org.springframework.data.mongodb.core.aggregation.ConditionalOperators;
 | 
	
		
			
				|  |  | +import org.springframework.data.mongodb.core.query.Criteria;
 | 
	
		
			
				|  |  | +import org.springframework.stereotype.Service;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +import java.time.LocalDateTime;
 | 
	
		
			
				|  |  | +import java.time.format.DateTimeFormatter;
 | 
	
		
			
				|  |  | +import java.time.temporal.TemporalAdjusters;
 | 
	
		
			
				|  |  | +import java.util.ArrayList;
 | 
	
		
			
				|  |  | +import java.util.List;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +@Slf4j
 | 
	
		
			
				|  |  | +@Service
 | 
	
		
			
				|  |  | +public class WaterMeterUnreadDeviceServiceImpl implements WaterMeterUnreadDeviceService {
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private MongoTemplate mongoTemplate;
 | 
	
		
			
				|  |  | +    @Autowired
 | 
	
		
			
				|  |  | +    private StatMeterUnreadDeviceMapper statMeterUnreadDeviceMapper;
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void yesterdayUnreadDeviceJobHandler(String param) {
 | 
	
		
			
				|  |  | +        JSONObject obj = JSONObject.parseObject(param);
 | 
	
		
			
				|  |  | +        XxlJobLogger.log("obj:{}", obj);
 | 
	
		
			
				|  |  | +        Integer statDay = 0;
 | 
	
		
			
				|  |  | +        Integer startDate = 0;
 | 
	
		
			
				|  |  | +        Integer endDate = 0;
 | 
	
		
			
				|  |  | +        if (obj != null) {
 | 
	
		
			
				|  |  | +            statDay =  obj.getInteger("statDay");
 | 
	
		
			
				|  |  | +            startDate =  obj.getInteger("startDate");
 | 
	
		
			
				|  |  | +            endDate =  obj.getInteger("endDate");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (statDay == null || statDay == 0) {
 | 
	
		
			
				|  |  | +            statDay = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (startDate == null || startDate == 0) {
 | 
	
		
			
				|  |  | +            startDate = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (endDate == null || endDate == 0) {
 | 
	
		
			
				|  |  | +            endDate = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        XxlJobLogger.log("statDay:{},startDate:{},endDate:{}",statDay, startDate, endDate);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        List<StatMeterUnreadDevice> results = this.findUnreadDevice(statDay,startDate,endDate);
 | 
	
		
			
				|  |  | +        System.out.println(results);
 | 
	
		
			
				|  |  | +        if (results != null && results.size() > 0) {
 | 
	
		
			
				|  |  | +            log.info("结果数量,size:{}", results.size());
 | 
	
		
			
				|  |  | +            XxlJobLogger.log("结果数量,size:{}", results.size());
 | 
	
		
			
				|  |  | +            statMeterUnreadDeviceMapper.batchInsert("sc_stat_meter_unread_device_by_building",results);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void recent7daysUnreadDeviceJobHandler(String param) {
 | 
	
		
			
				|  |  | +        JSONObject obj = JSONObject.parseObject(param);
 | 
	
		
			
				|  |  | +        XxlJobLogger.log("obj:{}", obj);
 | 
	
		
			
				|  |  | +        Integer statDay = 0;
 | 
	
		
			
				|  |  | +        Integer startDate = 0;
 | 
	
		
			
				|  |  | +        Integer endDate = 0;
 | 
	
		
			
				|  |  | +        if (obj != null) {
 | 
	
		
			
				|  |  | +            statDay =  obj.getInteger("statDay");
 | 
	
		
			
				|  |  | +            startDate =  obj.getInteger("startDate");
 | 
	
		
			
				|  |  | +            endDate =  obj.getInteger("endDate");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (statDay == null || statDay == 0) {
 | 
	
		
			
				|  |  | +            statDay = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (startDate == null || startDate == 0) {
 | 
	
		
			
				|  |  | +            startDate = Integer.valueOf(LocalDateTime.now().plusDays(-7).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (endDate == null || endDate == 0) {
 | 
	
		
			
				|  |  | +            endDate = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        XxlJobLogger.log("statDay:{},startDate:{},endDate:{}",statDay, startDate, endDate);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        List<StatMeterUnreadDevice> results = this.findUnreadDevice(statDay,startDate,endDate);
 | 
	
		
			
				|  |  | +        if (results != null && results.size() > 0) {
 | 
	
		
			
				|  |  | +            XxlJobLogger.log("结果数量,size:{}", results.size());
 | 
	
		
			
				|  |  | +            statMeterUnreadDeviceMapper.batchInsert("sc_stat_meter_unread_device_by_building_7day",results);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void recent15daysUnreadDeviceJobHandler(String param) {
 | 
	
		
			
				|  |  | +        JSONObject obj = JSONObject.parseObject(param);
 | 
	
		
			
				|  |  | +        XxlJobLogger.log("obj:{}", obj);
 | 
	
		
			
				|  |  | +        Integer statDay = 0;
 | 
	
		
			
				|  |  | +        Integer startDate = 0;
 | 
	
		
			
				|  |  | +        Integer endDate = 0;
 | 
	
		
			
				|  |  | +        if (obj != null) {
 | 
	
		
			
				|  |  | +            statDay =  obj.getInteger("statDay");
 | 
	
		
			
				|  |  | +            startDate =  obj.getInteger("startDate");
 | 
	
		
			
				|  |  | +            endDate =  obj.getInteger("endDate");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (statDay == null || statDay == 0) {
 | 
	
		
			
				|  |  | +            statDay = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (startDate == null || startDate == 0) {
 | 
	
		
			
				|  |  | +            startDate = Integer.valueOf(LocalDateTime.now().plusDays(-15).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (endDate == null || endDate == 0) {
 | 
	
		
			
				|  |  | +            endDate = Integer.valueOf(LocalDateTime.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        XxlJobLogger.log("statDay:{},startDate:{},endDate:{}",statDay, startDate, endDate);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        List<StatMeterUnreadDevice> results = this.findUnreadDevice(statDay,startDate,endDate);
 | 
	
		
			
				|  |  | +        if (results != null && results.size() > 0) {
 | 
	
		
			
				|  |  | +            XxlJobLogger.log("结果数量,size:{}", results.size());
 | 
	
		
			
				|  |  | +            statMeterUnreadDeviceMapper.batchInsert("sc_stat_meter_unread_device_by_building_15day",results);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    @Override
 | 
	
		
			
				|  |  | +    public void lastMonthUnreadDeviceJobHandler(String param) {
 | 
	
		
			
				|  |  | +        JSONObject obj = JSONObject.parseObject(param);
 | 
	
		
			
				|  |  | +        XxlJobLogger.log("obj:{}", obj);
 | 
	
		
			
				|  |  | +        Integer statDay = 0;
 | 
	
		
			
				|  |  | +        Integer startDate = 0;
 | 
	
		
			
				|  |  | +        Integer endDate = 0;
 | 
	
		
			
				|  |  | +        if (obj != null) {
 | 
	
		
			
				|  |  | +            statDay =  obj.getInteger("statDay");
 | 
	
		
			
				|  |  | +            startDate =  obj.getInteger("startDate");
 | 
	
		
			
				|  |  | +            endDate =  obj.getInteger("endDate");
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        if (statDay == null || statDay == 0) {
 | 
	
		
			
				|  |  | +            statDay = Integer.valueOf(LocalDateTime.now().plusMonths(-1).format(DateTimeFormatter.ofPattern("yyyyMM")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (startDate == null || startDate == 0) {
 | 
	
		
			
				|  |  | +            startDate = Integer.valueOf(LocalDateTime.now().plusMonths(-1).with(TemporalAdjusters.firstDayOfMonth()).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        if (endDate == null || endDate == 0) {
 | 
	
		
			
				|  |  | +            endDate = Integer.valueOf(LocalDateTime.now().plusMonths(-1).with(TemporalAdjusters.lastDayOfMonth()).format(DateTimeFormatter.ofPattern("yyyyMMdd")));
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        XxlJobLogger.log("statDay:{},startDate:{},endDate:{}",statDay, startDate, endDate);
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        List<StatMeterUnreadDevice> results = this.findUnreadDevice(statDay,startDate,endDate);
 | 
	
		
			
				|  |  | +        if (results != null && results.size() > 0) {
 | 
	
		
			
				|  |  | +            XxlJobLogger.log("结果数量,size:{}", results.size());
 | 
	
		
			
				|  |  | +            statMeterUnreadDeviceMapper.batchInsert("sc_stat_meter_unread_device_by_building_month",results);
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +    protected List<StatMeterUnreadDevice> findUnreadDevice(Integer statDay, Integer startDate, Integer endDate){
 | 
	
		
			
				|  |  | +        /*
 | 
	
		
			
				|  |  | +            db.sc_meter_read_record.aggregate(
 | 
	
		
			
				|  |  | +             [
 | 
	
		
			
				|  |  | +              {$match:{status:1,readDate:20200619}},
 | 
	
		
			
				|  |  | +                {$group:{_id:{
 | 
	
		
			
				|  |  | +                        "siteId": "$siteId","channelId": "$channelId","customerId": "$customerId","buildingId": "$buildingId","deviceId": "$deviceId",
 | 
	
		
			
				|  |  | +                    },
 | 
	
		
			
				|  |  | +                            realReadTimes:{$sum:{
 | 
	
		
			
				|  |  | +                         $cond: { if: { $eq: ["$readStatus", "1"] }, then: 1, else: 0 }
 | 
	
		
			
				|  |  | +                       }}
 | 
	
		
			
				|  |  | +                }},
 | 
	
		
			
				|  |  | +				{$match:{realReadTimes:0}},
 | 
	
		
			
				|  |  | +                { $project : {
 | 
	
		
			
				|  |  | +                    siteId : 1 ,
 | 
	
		
			
				|  |  | +                    channelId : 1 ,
 | 
	
		
			
				|  |  | +                    customerId : 1 ,
 | 
	
		
			
				|  |  | +                    buildingId : 1 ,
 | 
	
		
			
				|  |  | +                    realReadTimes : 1 ,
 | 
	
		
			
				|  |  | +              }}
 | 
	
		
			
				|  |  | +             ]
 | 
	
		
			
				|  |  | +             ,{ allowDiskUse: true }
 | 
	
		
			
				|  |  | +             )
 | 
	
		
			
				|  |  | +         */
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        List<AggregationOperation> operations = new ArrayList<>();
 | 
	
		
			
				|  |  | +        operations.add(Aggregation.match(Criteria.where("status").is(1)));
 | 
	
		
			
				|  |  | +        operations.add(Aggregation.match(Criteria.where("readDate").gte(startDate).lte(endDate)));
 | 
	
		
			
				|  |  | +        // 1,添加分组条件
 | 
	
		
			
				|  |  | +        ConditionalOperators.Cond readTimesCond = ConditionalOperators.when(new Criteria("readStatus").is("1")).then(1).otherwise(0);
 | 
	
		
			
				|  |  | +        operations.add(Aggregation.group("siteId","channelId","customerId","buildingId","deviceId")
 | 
	
		
			
				|  |  | +                .sum(readTimesCond).as("realReadTimes"));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        //2、再次过滤未抄的设备
 | 
	
		
			
				|  |  | +        operations.add(Aggregation.match(Criteria.where("realReadTimes").is(0)));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        operations.add(Aggregation.project("realReadTimes").
 | 
	
		
			
				|  |  | +                and("_id.siteId").as("siteId").
 | 
	
		
			
				|  |  | +                and("_id.channelId").as("channelId").
 | 
	
		
			
				|  |  | +                and("_id.customerId").as("customerId").
 | 
	
		
			
				|  |  | +                and("_id.buildingId").as("buildingId").
 | 
	
		
			
				|  |  | +                and("_id.deviceId").as("deviceId").
 | 
	
		
			
				|  |  | +                        andExclude("_id"));
 | 
	
		
			
				|  |  | +        //operations.add(Aggregation.sort(Sort.Direction.DESC, "total"));
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  | +        // 3,聚合查询所有信息
 | 
	
		
			
				|  |  | +        Aggregation aggregation = Aggregation.newAggregation(operations);
 | 
	
		
			
				|  |  | +        // 4,查询结果
 | 
	
		
			
				|  |  | +        AggregationResults<StatMeterUnreadDevice> aggregationResults = mongoTemplate.aggregate(aggregation, "sc_meter_read_record", StatMeterUnreadDevice.class);
 | 
	
		
			
				|  |  | +        // 5,获取结果
 | 
	
		
			
				|  |  | +        List<StatMeterUnreadDevice> results = aggregationResults.getMappedResults();
 | 
	
		
			
				|  |  | +        if (results != null && results.size() > 0) {
 | 
	
		
			
				|  |  | +            for (StatMeterUnreadDevice result : results) {
 | 
	
		
			
				|  |  | +                result.setStatDay(statDay);
 | 
	
		
			
				|  |  | +                result.setDateCreate(LocalDateTime.now());
 | 
	
		
			
				|  |  | +            }
 | 
	
		
			
				|  |  | +        }
 | 
	
		
			
				|  |  | +        return results;
 | 
	
		
			
				|  |  | +    }
 | 
	
		
			
				|  |  | +}
 |