Browse Source

数据同步

lin 4 years ago
parent
commit
0b094871da

+ 61 - 0
src/main/java/com/zcxk/syncdata/config/RabbitConfig.java

@@ -0,0 +1,61 @@
+package com.zcxk.syncdata.config;
+
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.FanoutExchange;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+
+/**
+ * <p>RabbitMQ配置</p>
+ *
+ * @Author wilian.peng
+ * @Date 2020/4/7 11:50
+ * @Version 1.0
+ */
+@Configuration
+public class RabbitConfig {
+
+    @Value("${spring.rabbitmq.exchange}")
+    private String exchange;
+    @Value("${spring.rabbitmq.queue}")
+    private String queue;
+
+    @Bean("customContainerFactory")
+    public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
+        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
+        //设置线程数
+        factory.setConcurrentConsumers(2);
+        //最大线程数
+        factory.setMaxConcurrentConsumers(10);
+        configurer.configure(factory, connectionFactory);
+        return factory;
+    }
+    //创建队列
+    @Bean
+    public Queue createFanoutQueue() {
+        return new Queue(queue);
+    }
+    //创建交换机
+    @Bean
+    public FanoutExchange defFanoutExchange() {
+        return new FanoutExchange(exchange);
+    }
+
+    //队列与交换机进行绑定
+    @Bean
+    Binding bindingFanout() {
+        return BindingBuilder.bind(createFanoutQueue()).
+                to(defFanoutExchange());
+    }
+
+
+
+}

+ 19 - 0
src/main/java/com/zcxk/syncdata/dto/DeviceParams.java

@@ -0,0 +1,19 @@
+package com.zcxk.syncdata.dto;
+
+import lombok.Data;
+
+@Data
+public class DeviceParams {
+    Integer batchId;
+    String meterNo;
+    String fileNo;
+    String concentratorNo;
+    String concentratorName;
+    String channelNumberNo;
+    String channelNumberName;
+    String port;
+    String communityName;
+    String buildingName;
+    String unitName;
+    String address;
+}

+ 41 - 0
src/main/java/com/zcxk/syncdata/rabitmq/BatchImportDeviceReceiver.java

@@ -0,0 +1,41 @@
+package com.zcxk.syncdata.rabitmq;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.rabbitmq.client.Channel;
+import com.zcxk.syncdata.dto.DeviceParams;
+import com.zcxk.syncdata.service.DeviceService;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+
+@Component
+@Slf4j
+public class BatchImportDeviceReceiver {
+
+    @Autowired
+    private DeviceService deviceService;
+
+
+    @RabbitListener(queues = {"${spring.rabbitmq.queue}"},containerFactory = "customContainerFactory")
+    public void receiver(Channel channel, Message message) throws IOException {
+        //String str = new String(body);
+        try {
+        String msg = new String(message.getBody());
+        log.info("-----BatchImportDeviceReceiver-----,"  +msg);
+        DeviceParams deviceParams = JSON.parseObject(msg, DeviceParams.class);
+        // 1,处理消息
+        //deviceService.batchImportDeviceHandler(deviceParams);
+
+        // 2,确认消息消费成功 
+        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
+        } catch (Exception e) {
+            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
+            log.error("consumer message error !",e);
+        }
+    }
+}

+ 4 - 0
src/main/java/com/zcxk/syncdata/service/DeviceService.java

@@ -1,5 +1,7 @@
 package com.zcxk.syncdata.service;
 
+import com.alibaba.fastjson.JSONObject;
+import com.zcxk.syncdata.dto.DeviceParams;
 import com.zcxk.syncdata.entity.Device;
 import org.springframework.web.multipart.MultipartFile;
 
@@ -22,6 +24,8 @@ public interface DeviceService {
     int batchInsert(List<Device> list);
 
     int importRemoteMeterExcel(MultipartFile file);
+
+    void batchImportDeviceHandler(DeviceParams deviceParams);
 }
 
 

+ 170 - 3
src/main/java/com/zcxk/syncdata/service/impl/DeviceServiceImpl.java

@@ -1,7 +1,10 @@
 package com.zcxk.syncdata.service.impl;
 
 import com.alibaba.excel.EasyExcel;
-import com.zcxk.syncdata.dao.UdipUnitRepository;
+import com.alibaba.fastjson.JSONObject;
+import com.zcxk.syncdata.dao.*;
+import com.zcxk.syncdata.dto.DeviceParams;
+import com.zcxk.syncdata.entity.*;
 import com.zcxk.syncdata.util.SnowflakeIdWorker;
 import com.zoniot.platform.unit.management.model.UdipUnit;
 import com.zcxk.syncdata.util.Util;
@@ -12,8 +15,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Service;
 import javax.annotation.Resource;
-import com.zcxk.syncdata.entity.Device;
-import com.zcxk.syncdata.dao.DeviceMapper;
+
 import com.zcxk.syncdata.service.DeviceService;
 import org.springframework.web.multipart.MultipartFile;
 
@@ -40,6 +42,18 @@ public class DeviceServiceImpl implements DeviceService {
     UdipUnitRepository udipUnitRepository;
     @Autowired
     private SnowflakeIdWorker idWorker;
+    @Resource
+    private ConcentratorMapper concentratorMapper;
+    @Resource
+    private ChannelNumberMapper channelNumberMapper;
+    @Resource
+    private CommunityMapper communityMapper;
+    @Resource
+    private BuildingMapper buildingMapper;
+    @Resource
+    private WaterRelatedDeviceMapper waterRelatedDeviceMapper;
+    @Resource
+    private DeviceDimensionMapper deviceDimensionMapper;
     @Autowired
     private RemoteMeterDataListener remoteMeterDataListener;
 
@@ -204,6 +218,159 @@ public class DeviceServiceImpl implements DeviceService {
         }
         return 0;
     }
+
+    @Override
+    public void batchImportDeviceHandler(DeviceParams deviceParams) {
+        Integer siteId = 1;
+        Integer customerId = 136;
+        Integer province = 420000;
+        Integer city = 420100;
+        Integer region = 420107;
+        Integer deviceTypeId = 19;
+        String ip = "58.250.37.159";
+        Integer port = 7000;
+
+
+        Community community = communityMapper.findByNameV2(siteId,province,city,region,deviceParams.getCommunityName());
+        if (community == null) {
+            community = new Community();
+            community.setSiteId(siteId);
+            community.setCustomerId(customerId);
+            community.setName(deviceParams.getCommunityName());
+            community.setProvince(province);
+            community.setCity(city);
+            community.setRegion(region);
+            community.setAddress(deviceParams.getCommunityName());
+            community.setStatus(1);
+            community.setCreateBy("system");
+            community.setUpdateBy("system");
+            community.setDateCreate(LocalDateTime.now());
+            community.setDateUpdate(LocalDateTime.now());
+            community.setCode(String.format("%03d",Integer.valueOf("1")));
+            communityMapper.insertSelective(community);
+        }
+
+        String buildingName = "";
+        if(!"NULL".equals(deviceParams.getUnitName())){
+            buildingName = deviceParams.getBuildingName() + deviceParams.getUnitName();
+        }
+        Building building = buildingMapper.findByNameV2(siteId,community.getId(),buildingName);
+        if (building == null) {
+            building = new Building();
+            building.setSiteId(siteId);
+            building.setCommunity(community.getId());
+            building.setName(buildingName);
+            building.setProvince(province);
+            building.setCity(city);
+            building.setRegion(region);
+            building.setAddress(community.getName()+buildingName);
+            building.setStatus(1);
+            building.setCreateBy("system");
+            building.setUpdateBy("system");
+            building.setCreateDate(LocalDateTime.now());
+            building.setUpdateDate(LocalDateTime.now());
+            buildingMapper.insertSelective(building);
+        }
+
+        Concentrator concentrator = concentratorMapper.findName(siteId,deviceParams.getConcentratorNo());
+        if (concentrator == null) {
+            concentrator = new Concentrator();
+            concentrator.setSiteId(siteId);
+            concentrator.setCustomerId(customerId);
+            concentrator.setSerialNumber(deviceParams.getConcentratorNo());
+            concentrator.setName(deviceParams.getConcentratorName());
+            concentrator.setAddress(community.getName());
+            concentrator.setDeviceType(deviceTypeId);
+            concentrator.setBaudRate(1);
+            concentrator.setIp(ip);
+            concentrator.setPort(port);
+            concentrator.setSimNo("1");
+            concentrator.setBuildingId(building.getId());
+            concentrator.setProvince(province);
+            concentrator.setCity(city);
+            concentrator.setRegion(region);
+            concentrator.setDeviceStatus(0);
+            concentrator.setStatus(1);
+            concentrator.setCreateBy("system");
+            concentrator.setUpdateBy("system");
+            concentrator.setDateCreate(LocalDateTime.now());
+            concentrator.setDateUpdate(LocalDateTime.now());
+            concentratorMapper.insertSelective(concentrator);
+        }
+        ChannelNumber channelNumber = null;
+        if(!"NULL".equals(deviceParams.getChannelNumberNo())){
+            channelNumber = channelNumberMapper.findName(concentrator.getId(),deviceParams.getChannelNumberName());
+            if (channelNumber == null) {
+                channelNumber = new ChannelNumber();
+                channelNumber.setSiteId(siteId);
+                channelNumber.setConcentratorId(concentrator.getId());
+                channelNumber.setChannelName(deviceParams.getChannelNumberName());
+                channelNumber.setChannelNo(deviceParams.getChannelNumberNo());
+                channelNumber.setStatus(1);
+                channelNumber.setCreateBy("system");
+                channelNumber.setUpdateBy("system");
+                channelNumber.setDateCreate(LocalDateTime.now());
+                channelNumber.setDateUpdate(LocalDateTime.now());
+                channelNumberMapper.insertSelective(channelNumber);
+            }
+        }
+
+        Device deviceTemp = deviceMapper.findByMeterNo(deviceParams.getMeterNo());
+        if (deviceTemp == null) {
+            Device device = new Device();
+            device.setId(idWorker.nextId());
+            device.setWaterMeterNo(deviceParams.getMeterNo());
+            device.setWaterMeterFileNo(deviceParams.getFileNo());
+            device.setDeviceType(deviceTypeId);
+            device.setSiteId(siteId);
+            device.setSysId(55);
+            device.setBuildingId(building.getId());
+            device.setLocDesc(deviceParams.getAddress());
+            device.setCustomerId(customerId);
+            device.setManufacturerId(8);
+            device.setDeviceStatus("5");
+            device.setStatus(true);
+            device.setCreateBy("system");
+            device.setUpdateBy("system");
+            device.setDateCreate(LocalDateTime.now());
+            device.setDateUpdate(LocalDateTime.now());
+            deviceMapper.insertSelective(device);
+
+            //2、同步设备维度关系
+            DeviceDimension deviceDimension = new DeviceDimension();
+            deviceDimension.setDeviceId(device.getId());
+            deviceDimension.setProvince(String.valueOf(building.getProvince()));
+            deviceDimension.setCity(String.valueOf(building.getCity()));
+            deviceDimension.setRegion(String.valueOf(building.getRegion()));
+            deviceDimension.setCommunity(String.valueOf(building.getCommunity()));
+            deviceDimension.setBuilding(String.valueOf(building.getId()));
+            deviceDimension.setBuilding(String.valueOf(building.getId()));
+            deviceDimension.setDevice(device.getDeviceNo());
+            deviceDimension.setCustomer(String.valueOf(device.getCustomerId()));
+            deviceDimension.setStatus(1);
+            deviceDimension.setCreateBy("system");
+            deviceDimension.setUpdateBy("system");
+            deviceDimension.setDateCreate(LocalDateTime.now());
+            deviceDimension.setDateUpdate(LocalDateTime.now());
+            deviceDimensionMapper.insertSelective(deviceDimension);
+
+            //关系
+            WaterRelatedDevice waterRelatedDevice = new WaterRelatedDevice();
+            waterRelatedDevice.setDeviceId(device.getId());
+            waterRelatedDevice.setConcentratorId(concentrator.getId());
+            if (channelNumber != null) {
+                waterRelatedDevice.setChannelNumberId(channelNumber.getId());
+            }
+            waterRelatedDevice.setPort(deviceParams.getPort());
+            waterRelatedDevice.setIssueStatus(1);
+            waterRelatedDevice.setStatus(1);
+            waterRelatedDevice.setDateCreate(LocalDateTime.now());
+            waterRelatedDevice.setDateUpdate(LocalDateTime.now());
+            waterRelatedDevice.setCreateBy("system");
+            waterRelatedDevice.setUpdateBy("system");
+            waterRelatedDeviceMapper.insertSelective(waterRelatedDevice);
+        }
+    }
 }
 
 

+ 27 - 138
src/main/java/com/zcxk/syncdata/service/impl/RemoteMeterDataListener.java

@@ -7,14 +7,18 @@ import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.zcxk.syncdata.dao.*;
 import com.zcxk.syncdata.dto.BuildingDto;
+import com.zcxk.syncdata.dto.DeviceParams;
 import com.zcxk.syncdata.entity.*;
 import com.zcxk.syncdata.util.SnowflakeIdWorker;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
 import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
 import org.springframework.stereotype.Component;
 
 import javax.annotation.Resource;
+import java.nio.charset.Charset;
 import java.time.LocalDateTime;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +42,12 @@ public class RemoteMeterDataListener extends AnalysisEventListener<Object> {
     private DeviceDimensionMapper deviceDimensionMapper;
     @Autowired
     private SnowflakeIdWorker idWorker;
+    @Resource
+    private RabbitTemplate rabbitTemplate;
+    @Value("${spring.rabbitmq.exchange}")
+    private String exchange;
+    @Value("${spring.rabbitmq.queue}")
+    private String queue;
 
     AtomicInteger success = new AtomicInteger(0);
     AtomicInteger fail = new AtomicInteger(0);
@@ -66,145 +76,24 @@ public class RemoteMeterDataListener extends AnalysisEventListener<Object> {
         String unitName = map.get(9).trim();
         String address = map.get(10).trim();
 
-        Community community = communityMapper.findByNameV2(siteId,province,city,region,communityName);
-        if (community == null) {
-            community = new Community();
-            community.setSiteId(siteId);
-            community.setCustomerId(customerId);
-            community.setName(communityName);
-            community.setProvince(province);
-            community.setCity(city);
-            community.setRegion(region);
-            community.setAddress(communityName);
-            community.setStatus(1);
-            community.setCreateBy("system");
-            community.setUpdateBy("system");
-            community.setDateCreate(LocalDateTime.now());
-            community.setDateUpdate(LocalDateTime.now());
-            community.setCode(String.format("%03d",Integer.valueOf("1")));
-            communityMapper.insertSelective(community);
-        }
-
-
-        if(!"NULL".equals(unitName)){
-            buildingName = buildingName + unitName;
-        }
-        Building building = buildingMapper.findByNameV2(siteId,community.getId(),buildingName);
-        if (building == null) {
-            building = new Building();
-            building.setSiteId(siteId);
-            building.setCommunity(community.getId());
-            building.setName(buildingName);
-            building.setProvince(province);
-            building.setCity(city);
-            building.setRegion(region);
-            building.setAddress(community.getName()+buildingName);
-            building.setStatus(1);
-            building.setCreateBy("system");
-            building.setUpdateBy("system");
-            building.setCreateDate(LocalDateTime.now());
-            building.setUpdateDate(LocalDateTime.now());
-            buildingMapper.insertSelective(building);
-        }
-
-        Concentrator concentrator = concentratorMapper.findName(siteId,concentratorNo);
-        if (concentrator == null) {
-            concentrator = new Concentrator();
-            concentrator.setSiteId(siteId);
-            concentrator.setCustomerId(customerId);
-            concentrator.setSerialNumber(concentratorNo);
-            concentrator.setName(concentratorName);
-            concentrator.setAddress(community.getName());
-            concentrator.setDeviceType(deviceTypeId);
-            concentrator.setBaudRate(1);
-            concentrator.setIp("58.250.37.159");
-            concentrator.setPort(7000);
-            concentrator.setSimNo("1");
-            concentrator.setBuildingId(building.getId());
-            concentrator.setProvince(province);
-            concentrator.setCity(city);
-            concentrator.setRegion(region);
-            concentrator.setDeviceStatus(0);
-            concentrator.setStatus(1);
-            concentrator.setCreateBy("system");
-            concentrator.setUpdateBy("system");
-            concentrator.setDateCreate(LocalDateTime.now());
-            concentrator.setDateUpdate(LocalDateTime.now());
-            concentratorMapper.insertSelective(concentrator);
-        }
-        ChannelNumber channelNumber = null;
-        if(!"NULL".equals(channelNumberNo)){
-            channelNumber = channelNumberMapper.findName(concentrator.getId(),channelNumberName);
-            if (channelNumber == null) {
-                channelNumber = new ChannelNumber();
-                channelNumber.setSiteId(siteId);
-                channelNumber.setConcentratorId(concentrator.getId());
-                channelNumber.setChannelName(channelNumberName);
-                channelNumber.setChannelNo(channelNumberNo);
-                channelNumber.setStatus(1);
-                channelNumber.setCreateBy("system");
-                channelNumber.setUpdateBy("system");
-                channelNumber.setDateCreate(LocalDateTime.now());
-                channelNumber.setDateUpdate(LocalDateTime.now());
-                channelNumberMapper.insertSelective(channelNumber);
-            }
-        }
-
-        Device deviceTemp = deviceMapper.findByMeterNo(meterNo);
-        if (deviceTemp == null) {
-            Device device = new Device();
-            device.setId(idWorker.nextId());
-            device.setWaterMeterNo(meterNo);
-            device.setWaterMeterFileNo(fileNo);
-            device.setDeviceType(deviceTypeId);
-            device.setSiteId(siteId);
-            device.setSysId(55);
-            device.setBuildingId(building.getId());
-            device.setLocDesc(address);
-            device.setCustomerId(customerId);
-            device.setManufacturerId(8);
-            device.setDeviceStatus("5");
-            device.setStatus(true);
-            device.setCreateBy("system");
-            device.setUpdateBy("system");
-            device.setDateCreate(LocalDateTime.now());
-            device.setDateUpdate(LocalDateTime.now());
-            deviceMapper.insertSelective(device);
-
-            //2、同步设备维度关系
-            DeviceDimension deviceDimension = new DeviceDimension();
-            deviceDimension.setDeviceId(device.getId());
-            deviceDimension.setProvince(String.valueOf(building.getProvince()));
-            deviceDimension.setCity(String.valueOf(building.getCity()));
-            deviceDimension.setRegion(String.valueOf(building.getRegion()));
-            deviceDimension.setCommunity(String.valueOf(building.getCommunity()));
-            deviceDimension.setBuilding(String.valueOf(building.getId()));
-            deviceDimension.setBuilding(String.valueOf(building.getId()));
-            deviceDimension.setDevice(device.getDeviceNo());
-            deviceDimension.setCustomer(String.valueOf(device.getCustomerId()));
-            deviceDimension.setStatus(1);
-            deviceDimension.setCreateBy("system");
-            deviceDimension.setUpdateBy("system");
-            deviceDimension.setDateCreate(LocalDateTime.now());
-            deviceDimension.setDateUpdate(LocalDateTime.now());
-            deviceDimensionMapper.insertSelective(deviceDimension);
+        DeviceParams deviceParams = new DeviceParams();
+        deviceParams.setBatchId(1);
+        deviceParams.setMeterNo(meterNo);
+        deviceParams.setFileNo(fileNo);
+        deviceParams.setConcentratorNo(concentratorNo);
+        deviceParams.setConcentratorName(concentratorName);
+        deviceParams.setChannelNumberNo(channelNumberNo);
+        deviceParams.setChannelNumberName(channelNumberName);
+        deviceParams.setPort(port);
+        deviceParams.setCommunityName(communityName);
+        deviceParams.setBuildingName(buildingName);
+        deviceParams.setUnitName(unitName);
+        deviceParams.setAddress(address);
+
+        String msg = JSON.toJSONString(deviceParams);
+        log.info("transfer mq msg:{}",msg);
+        rabbitTemplate.convertAndSend(exchange, queue,msg.getBytes(Charset.forName("UTF-8")));
 
-            //关系
-            WaterRelatedDevice waterRelatedDevice = new WaterRelatedDevice();
-            waterRelatedDevice.setDeviceId(device.getId());
-            waterRelatedDevice.setConcentratorId(concentrator.getId());
-            if (channelNumber != null) {
-                waterRelatedDevice.setChannelNumberId(channelNumber.getId());
-            }
-            waterRelatedDevice.setPort(port);
-            waterRelatedDevice.setIssueStatus(1);
-            waterRelatedDevice.setStatus(1);
-            waterRelatedDevice.setDateCreate(LocalDateTime.now());
-            waterRelatedDevice.setDateUpdate(LocalDateTime.now());
-            waterRelatedDevice.setCreateBy("system");
-            waterRelatedDevice.setUpdateBy("system");
-            waterRelatedDeviceMapper.insertSelective(waterRelatedDevice);
-        }
 
         int result = success.incrementAndGet();
         log.info("finish={}",result);

+ 35 - 0
src/main/resources/application-dev.properties

@@ -62,6 +62,41 @@ spring.data.mongodb.uri=mongodb://114.135.61.188:17017/udip
 logging.level.org.springframework.data.mongodb.core=DEBUG
 
 
+# redis
+#spring.redis.host=127.0.0.1
+spring.redis.host=114.135.61.188
+spring.redis.port=26379
+spring.redis.password=zoniot
+#spring.redis.port=6379
+#spring.redis.password=100Zone@123
+spring.redis.database=10
+spring.redis.timeout=10000ms
+spring.redis.jedis.pool.max-active=8
+spring.redis.jedis.pool.max-idle=8
+spring.redis.jedis.pool.max-wait=-1ms
+spring.redis.jedis.pool.min-idle=2
+
+#rabbitmq
+spring.rabbitmq.addresses=114.135.61.188:55672
+spring.rabbitmq.username=zoniot
+spring.rabbitmq.password=zcxk100
+spring.rabbitmq.virtual-host=/
+spring.rabbitmq.connection-timeout=15000
+spring.rabbitmq.template.mandatory=true
+
+# 开启发送确认
+spring.rabbitmq.publisher-confirms=true
+# 开启发送失败退回
+spring.rabbitmq.publisher-returns=true
+# 开启ACK
+spring.rabbitmq.listener.direct.acknowledge-mode=manual
+spring.rabbitmq.listener.simple.acknowledge-mode=manual
+
+spring.rabbitmq.exchange=batch-handler-exchange
+spring.rabbitmq.queue=import-device-handler-queue
+
+
+