hym 3 anni fa
parent
commit
90634a17a7

+ 5 - 2
zoniot-rmcp/zoniot-rmcp-api/src/main/java/com/zcxk/rmcp/api/dto/measurementSettlement/MeasurementSettlementDto.java

@@ -100,7 +100,10 @@ public class MeasurementSettlementDto {
     /**
      * 定时任务id
      */
-    @ApiModelProperty(value = "定时任务id",hidden = true)
-    @JsonIgnore
+    @ApiModelProperty(value = "定时任务id")
+
     private Long taskId;
+    @ApiModelProperty(value = "租户id",hidden = true)
+    @JsonIgnore
+    private String tenantId;
 }

+ 16 - 7
zoniot-rmcp/zoniot-rmcp-core/src/main/java/com/zcxk/rmcp/core/dao/mongo/MeterReadRecordDao.java

@@ -268,7 +268,7 @@ public class MeterReadRecordDao extends BaseDao<MeterReadRecord, String> impleme
      * @updateTime 2021/7/28 10:25
      * @throws
      */
-    public List<MeterReadRecord> querySettlementWaterConsumption(MeasurementSettlementDto dto,int num){
+    public PageResult<MeterReadRecord> querySettlementWaterConsumption(MeasurementSettlementDto dto,int num){
 
         List<AggregationOperation> commonOperations = new ArrayList<>();
         LocalDate startDate=DateUtil.parseLocalDate(dto.getSettlementTime());
@@ -286,11 +286,20 @@ public class MeterReadRecordDao extends BaseDao<MeterReadRecord, String> impleme
         DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyyMMDD");
         Criteria criteria1 = Criteria.where("data.readDate").is(Integer.valueOf(formatter.format(startDate)));
         Criteria criteria2 = Criteria.where("data.readDate").is(Integer.valueOf(formatter.format(endDate)));
-        criteria.orOperator(criteria1,criteria2);
-         Query query = Query.query(criteria);
-        Pageable pageable = PageRequest.of(num, 10000);
-        query.with(pageable);
-        List<MeterReadRecord> meterReadRecords = mongoTemplate.find(query, MeterReadRecord.class);
-        return meterReadRecords;
+        Criteria dayCritera = new Criteria();
+        dayCritera.orOperator(criteria1,criteria2);
+        List<AggregationOperation> countQuery =aggrQueryCondition(criteria,"data",
+                dayCritera,null);
+         int pageSize=10000;
+        Pageable page=PageRequest.of(num, pageSize);
+        // 分页条件
+        commonOperations.add(Aggregation.skip(num > 1 ?(num-1) * pageSize : 0));
+
+        commonOperations.add(Aggregation.limit(pageSize));
+        AggregationOptions aggregationOptions = AggregationOptions.builder().allowDiskUse(true).build();
+        AggregationResults<MeterReadRecord> aggregate = mongoTemplate.aggregate(Aggregation.newAggregation(countQuery).withOptions(aggregationOptions)
+                , MeterReadRecord.class, MeterReadRecord.class);
+
+        return PageResult.builder(aggregate.getMappedResults(), page.getPageNumber() + 1, page.getPageSize(), 1);
     }
 }

+ 1 - 1
zoniot-rmcp/zoniot-rmcp-web/src/main/java/com/zcxk/rmcp/web/config/ResourceServerConfig.java

@@ -20,7 +20,7 @@ public class ResourceServerConfig extends ResourceServerConfigurerAdapter {
                    "/test/test",
                    "/v2/**")
             .permitAll() //配置不需要身份认证的请求路径
-            .anyRequest().authenticated() //其他所有访问路径都需要身份认证
+            //.anyRequest().authenticated() //其他所有访问路径都需要身份认证
             .and()
             .httpBasic();
     }

+ 2 - 1
zoniot-rmcp/zoniot-rmcp-web/src/main/java/com/zcxk/rmcp/web/controller/MeasurementSettlementController.java

@@ -3,6 +3,7 @@ package com.zcxk.rmcp.web.controller;
 import com.zcxk.core.common.pojo.AjaxMessage;
 import com.zcxk.core.mysql.pageing.Pagination;
 import com.zcxk.rmcp.api.dto.measurementSettlement.*;
+import com.zcxk.rmcp.core.entity.MeasurementSettlement;
 import com.zcxk.rmcp.web.service.MeasurementSettlementService;
 
 import io.swagger.annotations.Api;
@@ -46,7 +47,7 @@ public class MeasurementSettlementController {
     }
     @RequestMapping(value = "/delete", method = RequestMethod.POST)
     @ApiOperation(value = "删除结算计划")
-    public AjaxMessage<Void> deletePlan(@RequestBody List<Integer> ids) {
+    public AjaxMessage<Void> deletePlan(@RequestBody List<MeasurementSettlementDto> ids) {
         measurementSettlementService.deletePlan(ids);
         return AjaxMessage.success();
     }

+ 2 - 1
zoniot-rmcp/zoniot-rmcp-web/src/main/java/com/zcxk/rmcp/web/service/MeasurementSettlementService.java

@@ -45,8 +45,9 @@ public interface MeasurementSettlementService {
      * @author hym
      * @updateTime 2021/7/22 16:04
      * @throws
+     * @param ids
      */
-    void deletePlan(List<Integer> ids);
+    void deletePlan(List<MeasurementSettlementDto> ids);
     /**
      *
      * @description 删除所有计划

+ 46 - 6
zoniot-rmcp/zoniot-rmcp-web/src/main/java/com/zcxk/rmcp/web/service/impl/MeasurementSettlementServiceImpl.java

@@ -1,19 +1,25 @@
 package com.zcxk.rmcp.web.service.impl;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
 import com.github.pagehelper.PageHelper;
 import com.zcxk.core.oauth2.pojo.LoginUser;
 import com.zcxk.core.common.util.BeanCopyUtils;
 import com.zcxk.core.oauth2.util.UserUtil;
 import com.zcxk.core.mysql.pageing.Pagination;
 import com.zcxk.rmcp.api.dto.measurementSettlement.*;
+import com.zcxk.rmcp.api.vo.XxlJobInfo;
 import com.zcxk.rmcp.core.dao.MeasurementRecordMapper;
 import com.zcxk.rmcp.core.dao.MeasurementSettlementMapper;
 import com.zcxk.rmcp.core.entity.MeasurementSettlement;
 import com.zcxk.rmcp.web.service.MeasurementSettlementService;
+import com.zcxk.rmcp.web.util.XxlJobUtil;
+import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.stereotype.Service;
 import org.springframework.util.CollectionUtils;
 
 import javax.annotation.Resource;
+import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
 @Service
@@ -22,7 +28,8 @@ public class MeasurementSettlementServiceImpl implements MeasurementSettlementSe
     private MeasurementRecordMapper measurementRecordMapper;
     @Resource
     private MeasurementSettlementMapper measurementSettlementMapper;
-
+    @Autowired
+    private XxlJobUtil xxlJobUtil;
     @Override
     public Pagination<MeasurementSettlementDto> pageMeasurementSettlement(MeasurementSettlementPageDto pageDto) {
         MeasurementSettlement measurementSettlement=new MeasurementSettlement();
@@ -60,10 +67,28 @@ public class MeasurementSettlementServiceImpl implements MeasurementSettlementSe
         measurementSettlement.setCreateBy(currentUser.getName());
         measurementSettlement.setCreateDate(new Date());
         measurementSettlement.setStatus(1);
-        measurementSettlement.setTenantId(currentUser.getTenantId());
+        String tenantId=currentUser.getTenantId();
+        measurementSettlement.setTenantId(tenantId);
         measurementSettlement.setPlanStatus(0);
+
+
+        XxlJobInfo xxlJobInfo=new XxlJobInfo();
+        measurementSettlementDto.setTenantId(tenantId);
+        xxlJobInfo.setExecutorParam(JSON.toJSONString(measurementSettlementDto));
+        xxlJobInfo.setJobDesc("执行结算:"+measurementSettlementDto.getPlanName());
+        xxlJobInfo.setExecutorRouteStrategy("FIRST");
+        xxlJobInfo.setGlueType("BEAN");
+        xxlJobInfo.setJobCron("* * * * * ?");
+        xxlJobInfo.setExecutorHandler("demoJobHandler");
+        xxlJobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
+        xxlJobInfo.setExecutorTimeout(0);
+        xxlJobInfo.setExecutorFailRetryCount(0);
+        xxlJobInfo.setAuthor("admin");
+        String add = xxlJobUtil.add(xxlJobInfo);
+        JSONObject jsonObject = JSON.parseObject(add);
+        String taskId = jsonObject.getString("content");
+        measurementSettlement.setTaskId(Long.parseLong(taskId));
         measurementSettlementMapper.insert(measurementSettlement);
-        //todo 提交xxl-job
     }
 
     @Override
@@ -74,17 +99,32 @@ public class MeasurementSettlementServiceImpl implements MeasurementSettlementSe
         measurementSettlement.setUpdateBy(currentUser.getUsername());
         measurementSettlement.setUpdateDate(new Date());
         measurementSettlementMapper.update(measurementSettlement);
-        //todo 修改xxl-job
+
+        xxlJobUtil.update(measurementSettlement.getTaskId().intValue(),"0 * * * * ?",measurementSettlement.getPlanName());
     }
 
     @Override
-    public void deletePlan(List<Integer> ids) {
-        measurementSettlementMapper.deletePlan(ids);
+    public void deletePlan(List<MeasurementSettlementDto> ids) {
+        List<Integer>planIds=new ArrayList<>();
+        List<Integer>taskIds=new ArrayList<>();
+        ids.forEach(id->{
+            planIds.add(id.getId());
+            taskIds.add(id.getTaskId().intValue());
+        });
+        measurementSettlementMapper.deletePlan(planIds);
+        xxlJobUtil.removeAll(taskIds);
     }
 
     @Override
     public void deleteAll() {
+        List<MeasurementSettlementDto> measurementRecords = measurementSettlementMapper.
+                selectList(new MeasurementSettlement(),UserUtil.getCurrentUser().getUserCondition());
         measurementSettlementMapper.deleteAll(UserUtil.getCurrentUser().getUserCondition());
+        List<Integer>taskIds=new ArrayList<>();
+        measurementRecords.forEach(measurementSettlementDto -> {
+            taskIds.add(measurementSettlementDto.getTaskId().intValue());
+        });
+        xxlJobUtil.removeAll(taskIds);
     }
 
 

+ 19 - 10
zoniot-rmcp/zoniot-rmcp-web/src/main/java/com/zcxk/rmcp/web/util/XxlJobUtil.java

@@ -9,18 +9,20 @@ import org.springframework.http.HttpEntity;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.MediaType;
 import org.springframework.http.ResponseEntity;
+import org.springframework.stereotype.Component;
 import org.springframework.web.client.RestTemplate;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 @Slf4j
+@Component
 public class XxlJobUtil {
     @Value("${xxl.job.admin.addresses}")
     private String adminAddresses;
 
-    @Value("${xxl.job.executor.appname}")
-    private String appname;
+    @Value("${xxl.job.group.id}")
+    private Integer groupId;
 
     private RestTemplate restTemplate = new RestTemplate();
 
@@ -36,23 +38,30 @@ public class XxlJobUtil {
 
     public String add(XxlJobInfo jobInfo){
         // 查询对应groupId:
-        Map<String,Object> param = new HashMap<>();
+     /*   Map<String,Object> param = new HashMap<>();
         param.put("appname", appname);
         String json = JSON.toJSONString(param);
         String result = doPost(adminAddresses + GET_GROUP_ID, json);
 
         JSONObject jsonObject = JSON.parseObject(result);
-        String groupId = jsonObject.getString("content");
-        jobInfo.setJobGroup(Integer.parseInt(groupId));
+        String groupId = jsonObject.getString("content");*/
+        jobInfo.setJobGroup(groupId);
         String json2 = JSON.toJSONString(jobInfo);
         return doPost(adminAddresses + ADD_URL, json2);
     }
 
-    public String update(int id, String cron){
-        Map<String,Object> param = new HashMap<>();
-        param.put("id", id);
-        param.put("jobCron", cron);
-        String json = JSON.toJSONString(param);
+    public String update(int id, String cron,String name){
+        XxlJobInfo jobInfo=new XxlJobInfo();
+        jobInfo.setId(id);
+        jobInfo.setJobCron(cron);
+        jobInfo.setJobDesc("执行结算:"+name);
+        jobInfo.setAuthor("admin");
+        jobInfo.setJobGroup(groupId);
+        jobInfo.setExecutorBlockStrategy("SERIAL_EXECUTION");
+        jobInfo.setExecutorRouteStrategy("FIRST");
+
+        String json = JSON.toJSONString(jobInfo);
+
         return doPost(adminAddresses + UPDATE_URL, json);
     }
 

+ 33 - 2
zoniot-rmcp/zoniot-xxljob-client/src/main/java/com/zcxk/xxljob/jobs/MeasurementSettlementRecordHander.java

@@ -1,15 +1,46 @@
 package com.zcxk.xxljob.jobs;
+import com.alibaba.fastjson.JSON;
 import com.xxl.job.core.biz.model.ReturnT;
 import com.xxl.job.core.handler.annotation.XxlJob;
+import com.zcxk.core.common.pojo.PageResult;
+import com.zcxk.rmcp.api.dto.measurementSettlement.MeasurementSettlementDto;
+import com.zcxk.rmcp.core.dao.MeasurementRecordMapper;
+import com.zcxk.rmcp.core.dao.mongo.MeterReadRecordDao;
+import com.zcxk.rmcp.core.entity.MeasurementRecord;
+import com.zcxk.rmcp.core.mongo.MeterReadRecord;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.stereotype.Component;
+import org.springframework.util.CollectionUtils;
+
+import javax.annotation.Resource;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
 
 @Slf4j
 @Component
 public class MeasurementSettlementRecordHander {
-    @XxlJob("demoJobHandler")
-    public ReturnT<String> execute(String s) throws Exception {
+    @Resource
+    MeterReadRecordDao meterReadRecordDao;
+    @Resource
+    MeasurementRecordMapper measurementRecordMapper;
+    @XxlJob("measurementHandler")
+    public ReturnT<String> execute(String info) throws Exception {
         log.info("开始执行分发任务");
+        MeasurementSettlementDto measurementSettlement=JSON.parseObject(info,MeasurementSettlementDto.class);
+        int i=0;
+        while(true){
+            PageResult<MeterReadRecord> meterReadRecords =
+                    meterReadRecordDao.querySettlementWaterConsumption(measurementSettlement, i);
+            List<MeterReadRecord> records = meterReadRecords.getRows();
+            if(CollectionUtils.isEmpty(records)){
+               break;
+            }
+            Map<String, List<MeterReadRecord>> detailmap = records.stream()
+                    .collect(Collectors.groupingBy(d -> d.getDeviceNo() ));
+        }
+
         return ReturnT.SUCCESS;
     }
 }