Browse Source

同步设备建筑小区

lin 4 years ago
parent
commit
acedbfcb98
29 changed files with 672 additions and 20 deletions
  1. 4 0
      meter-reading-common/src/main/java/com/huaxu/zoniot/dao/CustomerMapper.java
  2. 8 0
      meter-reading-common/src/main/java/com/huaxu/zoniot/dao/WaterMeterMapper.java
  3. 37 0
      meter-reading-common/src/main/java/com/huaxu/zoniot/model/BuildingData.java
  4. 40 0
      meter-reading-common/src/main/java/com/huaxu/zoniot/model/CommunityData.java
  5. 30 0
      meter-reading-common/src/main/java/com/huaxu/zoniot/model/DeviceData.java
  6. 16 0
      meter-reading-common/src/main/resources/mapper/CustomerMapper.xml
  7. 38 0
      meter-reading-common/src/main/resources/mapper/WaterMeterMapper.xml
  8. 52 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/config/RabbitConfig.java
  9. 5 1
      meter-reading-job/src/main/java/com/huaxu/zoniot/dao/PushLogMapper.java
  10. 5 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/entity/PushLog.java
  11. 29 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/job/BuildingSyncJob.java
  12. 30 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/job/CommunitySyncJob.java
  13. 2 2
      meter-reading-job/src/main/java/com/huaxu/zoniot/job/MeterErrorDaysJob.java
  14. 9 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/job/MeterFileSyncJob.java
  15. 5 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/service/BuildingSyncService.java
  16. 5 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/service/CommunitySyncService.java
  17. 7 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/service/MeterFileSyncService.java
  18. 86 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/service/impl/BuildingSyncServiceImpl.java
  19. 86 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/service/impl/CommunitySyncServiceImpl.java
  20. 66 4
      meter-reading-job/src/main/java/com/huaxu/zoniot/service/impl/MeterFileSyncServiceImpl.java
  21. 31 0
      meter-reading-job/src/main/java/com/huaxu/zoniot/web/TestController.java
  22. 11 7
      meter-reading-job/src/main/resources/application-job-dev.properties
  23. 10 0
      meter-reading-job/src/main/resources/mapper/PushLogMapper.xml
  24. 29 5
      meter-reading-wugang/src/main/java/com/zoniot/wg/controller/ExternalApiController.java
  25. 3 0
      meter-reading-wugang/src/main/java/com/zoniot/wg/dao/DeviceMapper.java
  26. 1 0
      meter-reading-wugang/src/main/java/com/zoniot/wg/dao/MeterReadRecordMapper.java
  27. 2 0
      meter-reading-wugang/src/main/java/com/zoniot/wg/repository/MeterReadRecordRepository.java
  28. 8 0
      meter-reading-wugang/src/main/resources/mapper/DeviceMapper.xml
  29. 17 1
      meter-reading-wugang/src/main/resources/mapper/MeterReadRecordMapper.xml

+ 4 - 0
meter-reading-common/src/main/java/com/huaxu/zoniot/dao/CustomerMapper.java

@@ -2,11 +2,13 @@ package com.huaxu.zoniot.dao;
 
 import com.huaxu.zoniot.entity.Customer;
 import org.apache.ibatis.annotations.Mapper;
+import org.apache.ibatis.annotations.Param;
 
 import java.util.List;
 
 /**
  * <p></p>
+ *
  * @Author wilian.peng
  * @Date 2021/1/14 15:59
  * @Version 1.0
@@ -26,4 +28,6 @@ public interface CustomerMapper {
     int updateByPrimaryKeySelective(Customer record);
 
     int updateByPrimaryKey(Customer record);
+
+    List<Integer> getSubId(@Param("customerId") Integer customerId);
 }

+ 8 - 0
meter-reading-common/src/main/java/com/huaxu/zoniot/dao/WaterMeterMapper.java

@@ -1,6 +1,7 @@
 package com.huaxu.zoniot.dao;
 
 import com.huaxu.zoniot.entity.WaterMeter;
+import com.huaxu.zoniot.model.DeviceData;
 import org.apache.ibatis.annotations.Mapper;
 import org.apache.ibatis.annotations.Param;
 
@@ -89,4 +90,11 @@ public interface WaterMeterMapper {
             @Param("status") Integer status,
             @Param("beginDate") Date beginDate,
             @Param("endDate") Date endDate);
+
+    List<DeviceData> getPushMeterListV2(
+            @Param("customers") String customers,
+            @Param("channels") String channels,
+            @Param("status") Integer status,
+            @Param("beginDate") Date beginDate,
+            @Param("endDate") Date endDate);
 }

+ 37 - 0
meter-reading-common/src/main/java/com/huaxu/zoniot/model/BuildingData.java

@@ -0,0 +1,37 @@
+package com.huaxu.zoniot.model;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+
+@Data
+public class BuildingData {
+
+    private Integer id;
+
+    private Integer communityId;
+
+    private Integer customerId;
+
+    private String name;
+
+    private String address;
+
+    private Integer province;
+
+    private Integer city;
+
+    private Integer region;
+
+    private Double longitude;
+
+    private Double latitude;
+
+    private Integer status;
+
+    private LocalDateTime createDate;
+
+    private LocalDateTime updateDate;
+
+
+}

+ 40 - 0
meter-reading-common/src/main/java/com/huaxu/zoniot/model/CommunityData.java

@@ -0,0 +1,40 @@
+package com.huaxu.zoniot.model;
+
+import lombok.Data;
+
+import java.time.LocalDateTime;
+import java.util.Date;
+
+@Data
+public class CommunityData {
+    private Integer id;
+
+    private String name;
+
+    private String code;
+
+    private Integer customerId;
+
+    private Integer province;
+
+    private Integer city;
+
+    private Integer region;
+
+    private Double longitude;
+
+    private Double latitude;
+
+    private Integer districtId;
+
+    private String address;
+
+    private String remark;
+
+    private Integer status;
+
+    private Date dateCreate;
+
+    private Date dateUpdate;
+
+}

+ 30 - 0
meter-reading-common/src/main/java/com/huaxu/zoniot/model/DeviceData.java

@@ -0,0 +1,30 @@
+package com.huaxu.zoniot.model;
+
+import com.alibaba.fastjson.annotation.JSONField;
+import lombok.Data;
+
+import java.util.Date;
+
+@Data
+public class DeviceData {
+    private Long id;
+    private String deviceNo;
+    private String meterNo;
+    private String fileNo;
+    private String sealNo;
+    private Integer sysId;
+    private Integer customerId;
+    private Integer deviceTypeId;
+    private Integer manufacturerId;
+    private Integer buildingId;
+    private Integer communityId;
+    private String locDesc;
+    private Double meterReading;
+    private Integer valveStatus;
+    @JSONField(format="yyyy-MM-dd HH:mm:ss")
+    private Date lastReceiveTime;
+    @JSONField(format="yyyy-MM-dd HH:mm:ss")
+    private Date dateCreate;
+    @JSONField(format="yyyy-MM-dd HH:mm:ss")
+    private Date dateUpdate;
+}

+ 16 - 0
meter-reading-common/src/main/resources/mapper/CustomerMapper.xml

@@ -277,4 +277,20 @@
       pay_invoice_type = #{payInvoiceType,jdbcType=INTEGER}
     where id = #{id,jdbcType=INTEGER}
   </update>
+
+  <select id="getSubId" resultType="java.lang.Integer">
+    SELECT  DATA.id FROM(
+                                SELECT
+                                        @ids as _ids,
+                                        (   SELECT @ids := GROUP_CONCAT(id)
+                                            FROM sc_customer
+                                            WHERE FIND_IN_SET(parent_id, @ids)
+                                                ) as cids,
+                                        @l := @l+1 as level
+                                FROM sc_customer,
+                                     (SELECT @ids :=#{customerId}, @l := 0 ) b
+                                WHERE @ids IS NOT NULL
+                                ) id, sc_customer DATA
+    WHERE FIND_IN_SET(DATA.id, ID._ids)
+  </select>
 </mapper>

+ 38 - 0
meter-reading-common/src/main/resources/mapper/WaterMeterMapper.xml

@@ -344,4 +344,42 @@
         and d.date_update > #{beginDate ,jdbcType=TIMESTAMP }
         and d.date_update <![CDATA[ <= ]]> #{endDate ,jdbcType=TIMESTAMP }
     </select>
+
+    <select id="getPushMeterListV2" resultType="com.huaxu.zoniot.model.DeviceData">
+        select
+            sd.id,
+            sd.device_no,
+            sd.water_meter_no as meter_no,
+            sd.water_meter_file_no as file_no,
+            sd.seal_no,
+            sd.sys_id,
+            sd.customer_id,
+            sd.device_type as device_type_id,
+            sd.manufacturer_id,
+            sd.building_id,
+            sb.community as community_id,
+            sd.loc_desc,
+            swmed.meter_reading,
+            swmed.valve_status,
+            sd.device_status,
+            sd.last_receive_time,
+            sd.status,
+            sd.date_create,
+            sd.date_update
+        from sc_device sd
+        LEFT JOIN sc_building sb on (sd.building_id =sb.id)
+        LEFT JOIN sc_water_meter_error_days swmed on (swmed.device_id =sd.id)
+        where sd.sys_id != -99
+        <if test="customers != null and customers != '' ">
+            AND FIND_IN_SET( sd.customer_id, #{customers} )
+        </if>
+        <if test="channels != null and channels != '' ">
+            AND FIND_IN_SET( sd.sys_id, #{channels} )
+        </if>
+        <if test="status != null">
+            AND sd.status = #{status}
+        </if>
+        and sd.date_update > #{beginDate ,jdbcType=TIMESTAMP }
+        and sd.date_update <![CDATA[ <= ]]> #{endDate ,jdbcType=TIMESTAMP }
+    </select>
 </mapper>

+ 52 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/config/RabbitConfig.java

@@ -0,0 +1,52 @@
+package com.huaxu.zoniot.config;
+
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.FanoutExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+/**
+ * <p>RabbitMQ配置</p>
+ *
+ * @Author wilian.peng
+ * @Date 2020/4/7 11:50
+ * @Version 1.0
+ */
+@Configuration
+public class RabbitConfig {
+
+    @Value("${spring.rabbitmq.exchange}")
+    private String exchange;
+    @Value("${spring.rabbitmq.device.queue}")
+    private String queue;
+
+
+    //创建队列
+    @Bean
+    public Queue createFanoutQueue() {
+        return new Queue(queue);
+    }
+    //创建交换机
+    @Bean
+    public FanoutExchange defFanoutExchange() {
+        return new FanoutExchange(exchange);
+    }
+
+    //队列与交换机进行绑定
+    @Bean
+    Binding bindingFanout() {
+        return BindingBuilder.bind(createFanoutQueue()).
+                to(defFanoutExchange());
+    }
+
+
+
+}

+ 5 - 1
meter-reading-job/src/main/java/com/huaxu/zoniot/dao/PushLogMapper.java

@@ -6,6 +6,7 @@ import org.apache.ibatis.annotations.Param;
 
 /**
  * <p></p>
+ *
  * @Author wilian.peng
  * @Date 2021/1/26 18:35
  * @Version 1.0
@@ -26,9 +27,12 @@ public interface PushLogMapper {
 
     /**
      * 查询最后一次推送记录
+     *
      * @param configId
      * @param pushContent
      * @return
      */
-    PushLog findLastRecord(@Param("configId")Integer configId, @Param("pushContent") Integer pushContent);
+    PushLog findLastRecord(@Param("configId") Integer configId, @Param("pushContent") Integer pushContent);
+
+    PushLog findLastRecordBycustomerId(@Param("customerId") Integer customerId);
 }

+ 5 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/entity/PushLog.java

@@ -26,6 +26,11 @@ public class PushLog {
     */
     private Integer pushConfigId;
 
+    /**
+     * 客户
+     */
+    private Integer customerId;
+
     /**
     * 推送时间
     */

+ 29 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/job/BuildingSyncJob.java

@@ -0,0 +1,29 @@
+package com.huaxu.zoniot.job;
+
+import com.alibaba.fastjson.JSON;
+import com.huaxu.zoniot.service.BuildingSyncService;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+
+@Slf4j
+@Component
+public class BuildingSyncJob {
+    @Autowired
+    private BuildingSyncService buildingSyncService;
+
+    @XxlJob("deviceSyncJobHandler")
+    public ReturnT<String> deviceSyncJobHandler(String param) throws Exception {
+        XxlJobLogger.log("XXL-JOB, device Sync Job.Param = {}",param);
+        Map<String,Object> paramMap = JSON.parseObject(param , Map.class);
+        int total = buildingSyncService.rabbitSync();
+        XxlJobLogger.log("XXL-JOB, Meter File Sync Job Finished. Total = {}",total);
+        return ReturnT.SUCCESS;
+    }
+}

+ 30 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/job/CommunitySyncJob.java

@@ -0,0 +1,30 @@
+package com.huaxu.zoniot.job;
+
+import com.alibaba.fastjson.JSON;
+import com.huaxu.zoniot.service.CommunitySyncService;
+import com.xxl.job.core.biz.model.ReturnT;
+import com.xxl.job.core.handler.annotation.XxlJob;
+import com.xxl.job.core.log.XxlJobLogger;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.util.Map;
+
+
+@Slf4j
+@Component
+public class CommunitySyncJob {
+
+    @Autowired
+    private CommunitySyncService communitySyncService;
+
+    @XxlJob("deviceSyncJobHandler")
+    public ReturnT<String> deviceSyncJobHandler(String param) throws Exception {
+        XxlJobLogger.log("XXL-JOB, device Sync Job.Param = {}",param);
+        Map<String,Object> paramMap = JSON.parseObject(param , Map.class);
+        int total = communitySyncService.rabbitSync();
+        XxlJobLogger.log("XXL-JOB, Meter File Sync Job Finished. Total = {}",total);
+        return ReturnT.SUCCESS;
+    }
+}

+ 2 - 2
meter-reading-job/src/main/java/com/huaxu/zoniot/job/MeterErrorDaysJob.java

@@ -22,11 +22,11 @@ public class MeterErrorDaysJob {
     WaterMeterErrorDaysService  waterMeterErrorDaysService ;
 
 
-    @XxlJob("clearWaterMeterErrorDaysJob")
+    /*@XxlJob("clearWaterMeterErrorDaysJob")
     public ReturnT<String> clearWaterMeterErrorDaysJobHandler(String param) throws Exception {
         XxlJobLogger.log("XXL-JOB, Clear Water Meter Error Days Job .Param = {}",param);
         int result = waterMeterErrorDaysService.clearErrorDays();
         XxlJobLogger.log("XXL-JOB, Clear Water Meter Error Days Job .Total = {}", result);
         return ReturnT.SUCCESS;
-    }
+    }*/
 }

+ 9 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/job/MeterFileSyncJob.java

@@ -45,4 +45,13 @@ public class MeterFileSyncJob {
             return ReturnT.SUCCESS;
         }
     }
+
+    @XxlJob("deviceSyncJobHandler")
+    public ReturnT<String> deviceSyncJobHandler(String param) throws Exception {
+        XxlJobLogger.log("XXL-JOB, device Sync Job.Param = {}",param);
+        Map<String,Object> paramMap = JSON.parseObject(param , Map.class);
+        int total = meterFileSyncService.rabbitSync();
+        XxlJobLogger.log("XXL-JOB, Meter File Sync Job Finished. Total = {}",total);
+        return ReturnT.SUCCESS;
+    }
 }

+ 5 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/service/BuildingSyncService.java

@@ -0,0 +1,5 @@
+package com.huaxu.zoniot.service;
+
+public interface BuildingSyncService {
+    int rabbitSync();
+}

+ 5 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/service/CommunitySyncService.java

@@ -0,0 +1,5 @@
+package com.huaxu.zoniot.service;
+
+public interface CommunitySyncService {
+    int rabbitSync();
+}

+ 7 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/service/MeterFileSyncService.java

@@ -25,4 +25,11 @@ public interface MeterFileSyncService {
      * @return
      */
     int fuJinSync(Map<String,Object> param);
+
+
+    /**
+     * 津南养老项目设备rabbit同步
+     * @return
+     */
+    int rabbitSync();
 }

+ 86 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/service/impl/BuildingSyncServiceImpl.java

@@ -0,0 +1,86 @@
+package com.huaxu.zoniot.service.impl;
+
+import cn.hutool.core.date.DateUtil;
+import com.alibaba.fastjson.JSON;
+import com.huaxu.zoniot.dao.CustomerMapper;
+import com.huaxu.zoniot.dao.PushLogMapper;
+import com.huaxu.zoniot.entity.PushLog;
+import com.huaxu.zoniot.model.DeviceData;
+import com.huaxu.zoniot.service.BuildingSyncService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@Slf4j
+@Service
+public class BuildingSyncServiceImpl implements BuildingSyncService {
+    @Autowired
+    PushLogMapper pushLogMapper;
+    @Autowired
+    CustomerMapper customerMapper;
+
+    @Resource
+    private RabbitTemplate rabbitTemplate;
+    @Value("${spring.rabbitmq.exchange}")
+    private String exchange;
+    @Value("${spring.rabbitmq.building.queue}")
+    private String queue;
+
+    @Override
+    public int rabbitSync() {
+        List<Integer> customerIds = new ArrayList<>();
+        customerIds.add(54);
+
+        if (customerIds != null && customerIds.size() > 0) {
+            for (Integer customerId : customerIds) {
+                // 1,查询推送记录,1档案;2数据
+                PushLog lastLog = pushLogMapper.findLastRecordBycustomerId(customerId);
+                // 2,计算推送数据时间;如果为首次推送,则判断是否需要将历史数据推送给对方;
+                Date beginDate = null ;
+                Date endDate = new Date();
+                Integer status = null ;
+                if(lastLog != null) {
+                    beginDate = lastLog.getPushTime() ;
+                }
+                else {
+                    // 首次推送,只推送有效的设备
+                    status = 1 ;
+                    beginDate = DateUtil.parse("1900-01-01", "yyyy-MM-dd");
+                }
+                // 3,根据推送数据时间进行数据查询
+                List<Integer> cIds = customerMapper.getSubId(customerId);
+                String pushCustomers = StringUtils.join(cIds, ",");
+                String pushChannels = "40,55";
+                log.info("begin query push device list , beginDate={}, endDate={}",
+                        beginDate, endDate);
+                /*List<DeviceData> dataList = waterMeterMapper.getPushMeterListV2(pushCustomers,
+                        pushChannels,
+                        status,
+                        beginDate,
+                        endDate);
+
+                log.info("end query push device list , size = {}",dataList.size());
+                if(dataList.size() != 0) {
+                    sendRabbitData(dataList);
+                }*/
+            }
+        }
+        return 0;
+    }
+
+    private void sendRabbitData(List<DeviceData> dataList) {
+        for (DeviceData data : dataList) {
+            String msg = JSON.toJSONString(data);
+            log.info("transfer mq msg:{}",msg);
+            //rabbitTemplate.convertAndSend(exchange, queue,msg.getBytes(Charset.forName("UTF-8")));
+        }
+    }
+}

+ 86 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/service/impl/CommunitySyncServiceImpl.java

@@ -0,0 +1,86 @@
+package com.huaxu.zoniot.service.impl;
+
+import cn.hutool.core.date.DateUtil;
+import com.alibaba.fastjson.JSON;
+import com.huaxu.zoniot.dao.CustomerMapper;
+import com.huaxu.zoniot.dao.PushLogMapper;
+import com.huaxu.zoniot.entity.PushLog;
+import com.huaxu.zoniot.model.DeviceData;
+import com.huaxu.zoniot.service.CommunitySyncService;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.Resource;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+
+@Slf4j
+@Service
+public class CommunitySyncServiceImpl implements CommunitySyncService {
+
+    @Autowired
+    PushLogMapper pushLogMapper;
+    @Autowired
+    CustomerMapper customerMapper;
+    @Resource
+    private RabbitTemplate rabbitTemplate;
+    @Value("${spring.rabbitmq.exchange}")
+    private String exchange;
+    @Value("${spring.rabbitmq.community.queue}")
+    private String queue;
+
+    @Override
+    public int rabbitSync() {
+        List<Integer> customerIds = new ArrayList<>();
+        customerIds.add(54);
+
+        if (customerIds != null && customerIds.size() > 0) {
+            for (Integer customerId : customerIds) {
+                // 1,查询推送记录,1档案;2数据
+                PushLog lastLog = pushLogMapper.findLastRecordBycustomerId(customerId);
+                // 2,计算推送数据时间;如果为首次推送,则判断是否需要将历史数据推送给对方;
+                Date beginDate = null ;
+                Date endDate = new Date();
+                Integer status = null ;
+                if(lastLog != null) {
+                    beginDate = lastLog.getPushTime() ;
+                }
+                else {
+                    // 首次推送,只推送有效的设备
+                    status = 1 ;
+                    beginDate = DateUtil.parse("1900-01-01", "yyyy-MM-dd");
+                }
+                // 3,根据推送数据时间进行数据查询
+                List<Integer> cIds = customerMapper.getSubId(customerId);
+                String pushCustomers = StringUtils.join(cIds, ",");
+                String pushChannels = "40,55";
+                log.info("begin query push device list , beginDate={}, endDate={}",
+                        beginDate, endDate);
+                /*List<DeviceData> dataList = waterMeterMapper.getPushMeterListV2(pushCustomers,
+                        pushChannels,
+                        status,
+                        beginDate,
+                        endDate);
+
+                log.info("end query push device list , size = {}",dataList.size());
+                if(dataList.size() != 0) {
+                    sendRabbitData(dataList);
+                }*/
+            }
+        }
+        return 0;
+    }
+
+    private void sendRabbitData(List<DeviceData> dataList) {
+        for (DeviceData data : dataList) {
+            String msg = JSON.toJSONString(data);
+            log.info("transfer mq msg:{}",msg);
+            //rabbitTemplate.convertAndSend(exchange, queue,msg.getBytes(Charset.forName("UTF-8")));
+        }
+    }
+}

+ 66 - 4
meter-reading-job/src/main/java/com/huaxu/zoniot/service/impl/MeterFileSyncServiceImpl.java

@@ -4,11 +4,9 @@ import cn.hutool.core.date.DateUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
+import com.huaxu.zoniot.dao.*;
+import com.huaxu.zoniot.model.DeviceData;
 import com.huaxu.zoniot.utils.HttpClientUtils;
-import com.huaxu.zoniot.dao.DeviceInfoPushConfigMapper;
-import com.huaxu.zoniot.dao.MeterInfoMapper;
-import com.huaxu.zoniot.dao.PushLogMapper;
-import com.huaxu.zoniot.dao.WaterMeterMapper;
 import com.huaxu.zoniot.entity.DeviceInfoPushConfig;
 import com.huaxu.zoniot.entity.MeterInfo;
 import com.huaxu.zoniot.entity.PushLog;
@@ -17,9 +15,13 @@ import com.huaxu.zoniot.utils.Sha256Util;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
+import java.nio.charset.Charset;
 import java.util.*;
 
 /**
@@ -43,6 +45,14 @@ public class MeterFileSyncServiceImpl  implements MeterFileSyncService {
 
     @Autowired
     MeterInfoMapper meterInfoMapper;
+    @Autowired
+    CustomerMapper customerMapper;
+    @Resource
+    private RabbitTemplate rabbitTemplate;
+    @Value("${spring.rabbitmq.exchange}")
+    private String exchange;
+    @Value("${spring.rabbitmq.device.queue}")
+    private String queue;
 
     @Override
     public int httpSync(Integer configId) {
@@ -259,4 +269,56 @@ public class MeterFileSyncServiceImpl  implements MeterFileSyncService {
         if(code == 0 ) {isSuccess = true ;}
         return isSuccess;
     }
+
+    @Override
+    public int rabbitSync() {
+
+        List<Integer> customerIds = new ArrayList<>();
+        customerIds.add(54);
+
+        if (customerIds != null && customerIds.size() > 0) {
+            for (Integer customerId : customerIds) {
+                // 1,查询推送记录,1档案;2数据
+                PushLog lastLog = pushLogMapper.findLastRecordBycustomerId(customerId);
+                // 2,计算推送数据时间;如果为首次推送,则判断是否需要将历史数据推送给对方;
+                Date beginDate = null ;
+                Date endDate = new Date();
+                Integer status = null ;
+                if(lastLog != null) {
+                    beginDate = lastLog.getPushTime() ;
+                }
+                else {
+                    // 首次推送,只推送有效的设备
+                    status = 1 ;
+                    beginDate = DateUtil.parse("1900-01-01", "yyyy-MM-dd");
+                }
+                // 3,根据推送数据时间进行数据查询
+                List<Integer> cIds = customerMapper.getSubId(customerId);
+                String pushCustomers = StringUtils.join(cIds, ",");
+                String pushChannels = "40,55";
+                log.info("begin query push device list , beginDate={}, endDate={}",
+                         beginDate, endDate);
+                List<DeviceData> dataList = waterMeterMapper.getPushMeterListV2(pushCustomers,
+                        pushChannels,
+                        status,
+                        beginDate,
+                        endDate);
+
+                log.info("end query push device list , size = {}",dataList.size());
+                if(dataList.size() != 0) {
+                    sendRabbitData(dataList);
+                }
+            }
+        }
+
+        return 0;
+    }
+
+    private void sendRabbitData(List<DeviceData> dataList) {
+        for (DeviceData data : dataList) {
+            String msg = JSON.toJSONString(data);
+            log.info("transfer mq msg:{}",msg);
+            //rabbitTemplate.convertAndSend(exchange, queue,msg.getBytes(Charset.forName("UTF-8")));
+        }
+    }
 }

+ 31 - 0
meter-reading-job/src/main/java/com/huaxu/zoniot/web/TestController.java

@@ -0,0 +1,31 @@
+package com.huaxu.zoniot.web;
+
+import com.huaxu.zoniot.job.MeterFileSyncJob;
+import io.swagger.annotations.Api;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+@Api(tags = "测试")
+@RequestMapping("test")
+@Slf4j
+public class TestController {
+
+    @Autowired
+    private MeterFileSyncJob meterFileSyncJob;
+
+    @GetMapping(value = "pushDevice")
+    public String pushDevice(
+
+    ) {
+        try {
+            meterFileSyncJob.deviceSyncJobHandler(null);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return "ok";
+    }
+}

+ 11 - 7
meter-reading-job/src/main/resources/application-job-dev.properties

@@ -2,7 +2,7 @@ server.port=8081
 server.servlet.context-path=/meter/reading/job
 logging.level.root=info
 logging.file.path=./logs
-#########################################XXL Job配置#################################################
+#########################################XXL Job锟斤拷锟斤拷#################################################
 xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
 xxl.job.accessToken=
 xxl.job.executor.appname=meter-reading-executor
@@ -11,20 +11,24 @@ xxl.job.executor.ip=
 xxl.job.executor.port=9995
 xxl.job.executor.logpath=C:/tmp/xxl-job/jobhandler
 xxl.job.executor.logretentiondays=30
-#########################################Rabbit MQ 配置#############################################
+#########################################Rabbit MQ 锟斤拷锟斤拷#############################################
 spring.rabbitmq.host=114.135.61.188
 spring.rabbitmq.port=55672
 spring.rabbitmq.username=zoniot
 spring.rabbitmq.password=zcxk100
 spring.rabbitmq.virtual-host=/
 spring.rabbitmq.connection-timeout=1000ms
-# 开启发送确认
+# 锟斤拷锟斤拷锟斤拷锟斤拷确锟斤拷
 spring.rabbitmq.publisher-confirm-type=correlated
-# 开启发送失败退回
+# 锟斤拷锟斤拷锟斤拷锟斤拷失锟斤拷锟剿伙拷
 spring.rabbitmq.publisher-returns=true
 spring.rabbitmq.template.mandatory=true
-# 开启ACK
+# 锟斤拷锟斤拷ACK
 spring.rabbitmq.listener.direct.acknowledge-mode=manual
 spring.rabbitmq.listener.simple.acknowledge-mode=manual
-# 任务队列
-job.task.rabbit.exchange=job-task-exchange
+# 锟斤拷锟斤拷锟斤拷锟�
+job.task.rabbit.exchange=job-task-exchange
+spring.rabbitmq.exchange=batch-handler-exchange
+spring.rabbitmq.device.queue=sync-device-handler-queue
+spring.rabbitmq.community.queue=sync-community-handler-queue
+spring.rabbitmq.building.queue=sync-building-handler-queue

+ 10 - 0
meter-reading-job/src/main/resources/mapper/PushLogMapper.xml

@@ -163,4 +163,14 @@
     and push_config_id = #{configId}
     order by push_time desc limit 1
   </select>
+
+  <select id="findLastRecordBycustomerId" resultMap="BaseResultMap">
+    select
+    <include refid="Base_Column_List" />
+    from
+    sc_push_log
+    where
+    customer_id = #{customerId}
+    order by push_time desc limit 1
+    </select>
 </mapper>

+ 29 - 5
meter-reading-wugang/src/main/java/com/zoniot/wg/controller/ExternalApiController.java

@@ -1,5 +1,6 @@
 package com.zoniot.wg.controller;
 
+import com.alibaba.fastjson.JSON;
 import com.zoniot.wg.dao.DeviceMapper;
 import com.zoniot.wg.dao.MeterReadRecordMapper;
 import com.zoniot.wg.dto.RequestBodyDto;
@@ -50,11 +51,11 @@ public class ExternalApiController {
     public RequestDto updateUserCode(
             @ApiParam(value = "结果", required = true) @RequestBody(required = true) RequestDto result
     ) {
+        log.info("updateUserCode result = " + result);
         RequestDto requestDto = new RequestDto();
         requestDto.setHead(result.getHead());
         requestDto.getHead().setTime(LocalDateTime.now().format(df));
         try {
-            log.info("updateUserCode result = " + result);
             Map<String,Object> map = (LinkedHashMap<String, Object>) result.getBody();
             List<LinkedHashMap<String, Object>> list = (List<LinkedHashMap<String, Object>>) map.get("request");
             if (list != null && list.size() > 0) {
@@ -80,11 +81,14 @@ public class ExternalApiController {
 
                 }
             }
+
             RequestBodyDto requestBodyDto = new RequestBodyDto<>();
             requestBodyDto.setState("success");
             requestDto.setBody(requestBodyDto);
+            log.info("updateUserCode success {}",JSON.toJSONString(requestBodyDto));
             return requestDto;
         }catch (Exception e){
+            log.info("Exception {}", JSON.toJSONString(e));
             RequestBodyDto requestBodyDto = new RequestBodyDto<>();
             requestBodyDto.setState("fail");
             requestBodyDto.setErrMsg(e.getMessage());
@@ -101,11 +105,12 @@ public class ExternalApiController {
     public RequestDto getMeterReadList(
             @ApiParam(value = "结果", required = true) @RequestBody(required = true) RequestDto result
     ) {
+        log.info("getMeterReadList result = " + result);
         RequestDto requestDto = new RequestDto();
         requestDto.setHead(result.getHead());
         requestDto.getHead().setTime(LocalDateTime.now().format(df));
         try {
-            log.info("getMeterReadList result = " + result);
+
             Map<String,Object> body = (LinkedHashMap<String, Object>) result.getBody();
             ArrayList<Object> request = (ArrayList<Object>) body.get("request");
             List<String> meterNos = new ArrayList<>();
@@ -122,12 +127,15 @@ public class ExternalApiController {
             Integer date = Integer.valueOf(LocalDate.now().plusDays(-1).format(DateTimeFormatter.ofPattern("yyyyMMdd")));;
 
             List<RequestMaterInfo> list = this.getReadingMeter(customerId,date,meterNos);
+
             RequestBodyDto requestBodyDto = new RequestBodyDto<>();
             requestBodyDto.setState("success");
             requestBodyDto.setResult(list);
             requestDto.setBody(requestBodyDto);
+            log.info("getMeterReadList success");
             return requestDto;
         }catch (Exception e){
+            log.info("Exception {}",JSON.toJSONString(e));
             RequestBodyDto requestBodyDto = new RequestBodyDto<>();
             requestBodyDto.setState("fail");
             requestBodyDto.setErrMsg(e.getMessage());
@@ -138,17 +146,33 @@ public class ExternalApiController {
     }
 
     private List<RequestMaterInfo> getReadingMeter(Integer customerId, Integer date, List<String> meterNos) {
-        //List<RequestMaterInfo> list = meterReadRecordMapper.getReadingMeter(customerId,date,meterNo);
+        //List<RequestMaterInfo> list = meterReadRecordMapper.getReadingMeterV2(customerId,date,meterNos);
+        List<Device> deviceList = deviceMapper.getList(customerId,meterNos);
+        List<Long> deviceIdList = new ArrayList<>();
+        for (Device device : deviceList) {
+            deviceIdList.add(device.getId());
+        }
 
         List<RequestMaterInfo> list = new ArrayList<>();
         List<MeterReadRecord> readRecordList = new ArrayList<>();
-        if (meterNos == null || meterNos.size() == 0) {
+        if (deviceIdList == null || deviceIdList.size() == 0) {
             readRecordList = meterReadRecordRepository.findByReadDateAndCustomerIdAndReadStatus(date,customerId,"2");
         }else {
-            readRecordList = meterReadRecordRepository.findByReadDateAndCustomerIdAndReadStatusAndMeterFileNoIn(date,customerId,"2",meterNos);
+            readRecordList = meterReadRecordRepository.findByReadDateAndCustomerIdAndReadStatusAndDeviceIdIn(date,customerId,"2",deviceIdList);
         }
+
+
+
         if (readRecordList != null && readRecordList.size() > 0) {
             for (MeterReadRecord meterReadRecord : readRecordList) {
+                for (Device device : deviceList) {
+                    if(meterReadRecord.getDeviceId().equals(device.getId())){
+                        meterReadRecord.setMeterFileNo(device.getWaterMeterFileNo());
+                        meterReadRecord.setMeterNo(device.getWaterMeterNo());
+                        continue;
+                    }
+                }
+
                 RequestMaterInfo requestMaterInfo = new RequestMaterInfo();
                 requestMaterInfo.setCiid(meterReadRecord.getMeterFileNo());
                 requestMaterInfo.setMiid(meterReadRecord.getMeterNo());

+ 3 - 0
meter-reading-wugang/src/main/java/com/zoniot/wg/dao/DeviceMapper.java

@@ -9,5 +9,8 @@ import java.util.List;
 @Mapper
 public interface DeviceMapper {
     int updateByPrimaryKeySelective(@Param("device") Device device);
+
     List<Long> findDeviceIdByWaterMeterNo(@Param("customerId") Integer customerId, @Param("meterNo") String meterNo);
+
+    List<Device> getList(@Param("customerId") Integer customerId, @Param("meterNos") List<String> meterNos);
 }

+ 1 - 0
meter-reading-wugang/src/main/java/com/zoniot/wg/dao/MeterReadRecordMapper.java

@@ -9,4 +9,5 @@ import java.util.List;
 @Mapper
 public interface MeterReadRecordMapper {
     List<RequestMaterInfo> getReadingMeter(@Param("customerId") Integer customerId, @Param("date") Integer date, @Param("meterNo") String meterNo);
+    List<RequestMaterInfo> getReadingMeterV2(@Param("customerId") Integer customerId, @Param("date") Integer date, @Param("meterNos") List<String> meterNos);
 }

+ 2 - 0
meter-reading-wugang/src/main/java/com/zoniot/wg/repository/MeterReadRecordRepository.java

@@ -10,4 +10,6 @@ public interface MeterReadRecordRepository extends MongoRepository<MeterReadReco
     List<MeterReadRecord> findByReadDateAndCustomerIdAndReadStatus(Integer date, Integer customerId,String readStatus);
 
     List<MeterReadRecord> findByReadDateAndCustomerIdAndReadStatusAndMeterFileNoIn(Integer date, Integer customerId,String readStatus, List<String> meterNos);
+
+    List<MeterReadRecord> findByReadDateAndCustomerIdAndReadStatusAndDeviceIdIn(Integer date, Integer customerId,String readStatus, List<Long> deviceIds);
 }

+ 8 - 0
meter-reading-wugang/src/main/resources/mapper/DeviceMapper.xml

@@ -37,5 +37,13 @@
         </set>
         WHERE id = #{device.id,jdbcType=BIGINT}
     </update>
+
+    <select id="getList" resultType="com.zoniot.wg.entity.Device">
+        select id,water_meter_no,water_meter_file_no,device_no from sc_device where status = 1
+        and customer_id = #{customerId}
+        <if test="meterNos != null and meterNos.size() != 0">
+            and water_meter_file_no in <foreach collection="meterNos" item="item" open="(" separator="," close=")">#{item}</foreach>
+        </if>
+    </select>
 </mapper>
 

+ 17 - 1
meter-reading-wugang/src/main/resources/mapper/MeterReadRecordMapper.xml

@@ -23,6 +23,22 @@
         </if>
     </select>
 
-
+    <select id="getReadingMeterV2" resultType="com.zoniot.wg.dto.RequestMaterInfo">
+        select
+        meter_file_no as ciid,
+        water_meter_no as miid,
+        read_data as mrecode,
+        read_time as mrrdate,
+        now() as mrinputdate,
+        'xt' as mrinputper,
+        'Y' as mrreadok,
+        0 as mrface
+        from sc_device sd
+        where status = 1
+        and customer_id = #{customerId}
+        <if test="meterNos != null and meterNos.size() != 0">
+            and water_meter_no in <foreach collection="meterNos" item="item" open="(" separator="," close=")">#{item}</foreach>
+        </if>
+    </select>
 </mapper>