Browse Source

设备数据写入到MongoDB PengDi@2021/1/6

pengdi@zoniot.com 4 years ago
parent
commit
242b3696de

+ 8 - 2
meter-reading-common/src/main/java/com/huaxu/zoniot/entity/MeterReadRecord.java

@@ -182,8 +182,14 @@ public class MeterReadRecord implements Serializable {
     private Integer manufacturerId ;
 
     private String manufacturerName ;
+
+    private Integer channelNumberId;
+
+    private String channelName ;
+
     /**
-     * 最后上送数据
+     * 最后上送数据,格式如下:
+     * {"数据ID":11111,data:{"WSV":"1","VOL":"2.5"}}
      */
-    private Map<String, MeasuringData> lastSendData ;
+    private Map<String, String> lastSendData ;
 }

+ 4 - 0
meter-reading-common/src/main/java/com/huaxu/zoniot/entity/WaterMeter.java

@@ -158,4 +158,8 @@ public class WaterMeter {
     private Integer manufacturerId ;
 
     private String manufacturerName ;
+
+    private Integer channelNumberId;
+
+    private String channelName ;
 }

+ 14 - 5
meter-reading-common/src/main/java/com/huaxu/zoniot/service/impl/MeterReadRecordServiceImpl.java

@@ -32,10 +32,7 @@ import org.springframework.transaction.annotation.Transactional;
 
 import javax.annotation.Resource;
 import java.math.BigDecimal;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * <p>抄表记录服务实现类</p>
@@ -202,6 +199,9 @@ public class MeterReadRecordServiceImpl implements MeterReadRecordService {
         meterReadRecord.setCollectorNo(waterMeter.getCollectorNo());
         meterReadRecord.setBuildingName(waterMeter.getBuildingName());
         meterReadRecord.setCommunityName(waterMeter.getCommunityName());
+        meterReadRecord.setChannelNumberId(waterMeter.getChannelNumberId());
+        meterReadRecord.setChannelName(waterMeter.getChannelName());
+
         return meterReadRecord ;
     }
 
@@ -380,7 +380,7 @@ public class MeterReadRecordServiceImpl implements MeterReadRecordService {
             meterReadRecordMapper.updateByPrimaryKeySelective(update) ;
             // 同时也将mongo抄表记录更新
             try {
-                record.setLastSendData(measuringData);
+                record.setLastSendData(getMeasuringValueMap(measuringData));
                 update.setLastCost(todayCost);
                 record.setReadData(currentReading);
                 record.setLastValid(currentReading);
@@ -408,6 +408,15 @@ public class MeterReadRecordServiceImpl implements MeterReadRecordService {
         log.debug("-----end meter reading , meter = {},readDay = {},Cost = {}ms",waterMeter.getDeviceId(),readDay,clock.getTime());
     }
 
+    protected Map<String,String> getMeasuringValueMap(Map<String, MeasuringData> measuringData){
+        Map<String,String> measuringValueMap = new HashMap<>(measuringData.size());
+        Iterator<Map.Entry<String, MeasuringData>> iterator = measuringData.entrySet().iterator();
+        while(iterator.hasNext()){
+            Map.Entry<String, MeasuringData> next = iterator.next();
+            measuringValueMap.put(next.getValue().getMeasuringCode(),next.getValue().getMeasuringVaule());
+        }
+        return measuringValueMap ;
+    }
     @Override
     public void meterReading(Long meterId, Map<String, MeasuringData> measuringData) {
         WaterMeter meter = waterMeterMapper.findWaterMeterById(meterId);

+ 4 - 3
meter-reading-common/src/main/resources/mapper/WaterMeterMapper.xml

@@ -40,8 +40,6 @@
       <result column="manufacturer_name" jdbcType="VARCHAR" property="manufacturerName" />
 
 
-
-
   </resultMap>
   <sql id="Base_Column_List">
     <!--@mbg.generated-->
@@ -81,7 +79,9 @@
 	  concat_ws('/',m.name,dt.equipment_type,dt.model) as device_type_name,
 	  d.manufacturer_id as manufacturer_id ,
 	  m.`name` as manufacturer_name,
-      cu.customer_name as customer_name
+      cu.customer_name as customer_name,
+      cn.channel_name as channel_name ,
+      wrd.channel_number_id as channel_number_id
   </sql>
   <sql id="Base_Meter_Query_Where">
       d.`status` = 1
@@ -112,6 +112,7 @@
 	  left join sc_device_type dt on (d.device_type = dt.id)
 	  left join sc_device_manufacturer m on (d.manufacturer_id = m.id)
 	  left join sc_customer cu on (cu.id = d.customer_id)
+	  left join sc_channel_number cn on(cn.id = wrd.channel_number_id)
   </sql>
   <select id="findAllWaterMeterList" resultMap="BaseResultMap">
     <include refid="Base_Meter_Query" />

+ 5 - 1
smart-city-intf/pom.xml

@@ -110,7 +110,11 @@
 			<groupId>org.springframework.boot</groupId>
 			<artifactId>spring-boot-starter-amqp</artifactId>
 		</dependency>
-
+		<!-- SpringBoot MongoDB-->
+		<dependency>
+			<groupId>org.springframework.boot</groupId>
+			<artifactId>spring-boot-starter-data-mongodb</artifactId>
+		</dependency>
 		<dependency>
 			<groupId>com.aliyun.mq</groupId>
 			<artifactId>mq-http-sdk</artifactId>

+ 51 - 0
smart-city-intf/src/main/java/com/zcxk/smartcity/data/access/config/MongoDBConfig.java

@@ -0,0 +1,51 @@
+package com.zcxk.smartcity.data.access.config;
+
+import com.mongodb.MongoClientOptions;
+import org.springframework.beans.factory.BeanFactory;
+import org.springframework.beans.factory.NoSuchBeanDefinitionException;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.convert.CustomConversions;
+import org.springframework.data.mongodb.MongoDbFactory;
+import org.springframework.data.mongodb.core.convert.DbRefResolver;
+import org.springframework.data.mongodb.core.convert.DefaultDbRefResolver;
+import org.springframework.data.mongodb.core.convert.DefaultMongoTypeMapper;
+import org.springframework.data.mongodb.core.convert.MappingMongoConverter;
+import org.springframework.data.mongodb.core.mapping.MongoMappingContext;
+
+/**
+ * @author pengdi
+ */
+@Configuration
+public class MongoDBConfig {
+    /**
+     * <p>解决 MongoSocketReadTimeoutException异常</p>
+     * @return
+     */
+    @Bean
+    public MongoClientOptions mongoOptions() {
+        return MongoClientOptions
+            .builder()
+            .maxConnectionIdleTime(60000)
+            .build();
+    }
+
+    /**
+     * <p>去除文档中的_class字段</p>
+     * @param factory
+     * @param context
+     * @param beanFactory
+     * @return
+     */
+    @Bean
+    public MappingMongoConverter mappingMongoConverter(MongoDbFactory factory, MongoMappingContext context, BeanFactory beanFactory) {
+        DbRefResolver dbRefResolver = new DefaultDbRefResolver(factory);
+        MappingMongoConverter mappingConverter = new MappingMongoConverter(dbRefResolver, context);
+        try {
+            mappingConverter.setCustomConversions(beanFactory.getBean(CustomConversions.class));
+        } catch (NoSuchBeanDefinitionException ignore) {
+        }
+        mappingConverter.setTypeMapper(new DefaultMongoTypeMapper(null));
+        return mappingConverter;
+    }
+}

+ 107 - 0
smart-city-intf/src/main/java/com/zcxk/smartcity/data/access/dto/DeviceData.java

@@ -0,0 +1,107 @@
+package com.zcxk.smartcity.data.access.dto;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+import java.io.Serializable;
+import java.time.LocalDateTime;
+import java.util.Date;
+import java.util.Map;
+
+/**
+ * 
+ * @Description  
+ * <p>设备数据表,按照设备ID进行分片</p>
+ * sh.enableSharding("iot-water-database")
+ * sh.shardCollection("iot-water-database.sc_device_data",{"receiveDate":1,"deviceId":1});
+ * @author wilian.peng
+ * @date 2019年11月12日 下午10:05:10
+ */
+@Data
+@ApiModel("设备数据表")
+@Document(collection = "sc_device_data")
+public class DeviceData implements Serializable{
+	private static final long serialVersionUID = 4357722209160768792L;
+	@Id
+	@ApiModelProperty("id")
+	private Long id;
+
+	@ApiModelProperty("设备ID")
+	private Long deviceId ;
+
+	@ApiModelProperty("设备编号")
+	private String deviceNo;
+
+	@ApiModelProperty("站点ID")
+	private Integer siteId ;
+
+	@ApiModelProperty(value = "厂商", position = 9)
+	private Integer manufacturerId;
+
+//	@ApiModelProperty(value = "厂商名称", position = 9)
+//	private String manufacturerName ;
+
+	@ApiModelProperty("场景ID")
+	private Integer channelId;
+
+	@ApiModelProperty("设备类型ID")
+	private Integer deviceTypeId ;
+
+//	@ApiModelProperty("设备类型名称")
+//	private String deviceTypeName;
+
+	@ApiModelProperty("建筑Id")
+	private Integer buildingId;
+
+//	private String buildingName ;
+
+	@ApiModelProperty("小区Id")
+	private Integer communityId;
+
+//	private String  communityName ;
+
+//	@ApiModelProperty("省Id")
+//	private Integer provinceId;
+//
+//	private String provinceName ;
+
+//	@ApiModelProperty("市Id")
+//	private Integer cityId;
+//
+//	private String cityName ;
+
+//	@ApiModelProperty("区Id")
+//	private Integer regionId;
+//
+//	private String regionName ;
+
+	@ApiModelProperty("设备地址")
+	private String location ;
+
+	@ApiModelProperty("上报日期")
+	private Integer receiveDate ;
+
+	@ApiModelProperty("上报详细时间")
+	private Date receiveTime;
+
+	@ApiModelProperty("状态")
+	private Integer status;
+
+	@ApiModelProperty("上报详细数据")
+	private Map<String,String> data;
+
+	@ApiModelProperty("创建时间")
+	private LocalDateTime dateCreate;
+
+	@ApiModelProperty("更新时间")
+	private LocalDateTime dateUpdate ;
+
+	@ApiModelProperty("客户Id")
+	private Integer customerId;
+
+//	@ApiModelProperty("客户名称")
+//	private String customerName ;
+}

+ 14 - 0
smart-city-intf/src/main/java/com/zcxk/smartcity/data/access/repository/DeviceDataRepository.java

@@ -0,0 +1,14 @@
+package com.zcxk.smartcity.data.access.repository;
+
+import com.zcxk.smartcity.data.access.dto.DeviceData;
+import org.springframework.data.mongodb.repository.MongoRepository;
+
+/**
+ * <p>设备数据MongoDB仓库类</p>
+ *
+ * @Author wilian.peng
+ * @Date 2021/1/7 11:41
+ * @Version 1.0
+ */
+public interface DeviceDataRepository extends MongoRepository<DeviceData,Long> {
+}

+ 47 - 5
smart-city-intf/src/main/java/com/zcxk/smartcity/data/access/service/impl/DeviceDataServiceImpl.java

@@ -2,14 +2,13 @@ package com.zcxk.smartcity.data.access.service.impl;
 
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.time.LocalDateTime;
+import java.util.*;
 
 import com.zcxk.smartcity.data.access.common.CommonConstant;
+import com.zcxk.smartcity.data.access.dto.DeviceData;
 import com.zcxk.smartcity.data.access.rabbitmq.WaterMeterDataSender;
+import com.zcxk.smartcity.data.access.repository.DeviceDataRepository;
 import org.apache.commons.collections.MapUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.logging.Log;
@@ -112,6 +111,9 @@ public class DeviceDataServiceImpl implements DeviceDataService {
 	@Resource
 	WaterMeterDataSender waterMeterDataSender ;
 
+	@Autowired
+	DeviceDataRepository  deviceDataRepository ;
+
 	@Override
 	public int savaDeviceData(Device device, DeviceDataBase srcData, ProtocolData data) {
 		log.info(
@@ -557,6 +559,15 @@ public class DeviceDataServiceImpl implements DeviceDataService {
 				i++;
 			}
 		}
+		// 将数据保存到MongoDB中
+		try{
+			// 构建DeviceData对象
+			DeviceData deviceData = buildDeviceData(device, dataMap, receiveTime);
+			deviceDataRepository.save(deviceData);
+		}catch (Exception e){
+			e.printStackTrace();
+			log.error("Device data save mongo failed !",e);
+		}
 		// 2,判断设备是否水表,如果是水表则保存读数
 		if(device.getIsWater() == 1) {
 			// 如果是水表数据则继续将数据发送到抄表队列供抄表服务进行抄表 2020/12/25
@@ -589,6 +600,37 @@ public class DeviceDataServiceImpl implements DeviceDataService {
 		return i;
 	}
 
+	protected DeviceData buildDeviceData(Device device, Map<String, MeasuringData> dataMap, String receiveTime){
+		DeviceData deviceData = new DeviceData();
+		deviceData.setChannelId(device.getSysId());
+		deviceData.setCommunityId(device.getCommunityId());
+		deviceData.setCustomerId(device.getCustomerId());
+		deviceData.setDateCreate(LocalDateTime.now());
+		deviceData.setDeviceId(device.getId());
+		deviceData.setDeviceNo(device.getDeviceNo());
+		deviceData.setDeviceTypeId(device.getDeviceType());
+		deviceData.setId(idWorker.nextId());
+		deviceData.setLocation(device.getLocDesc());
+		deviceData.setManufacturerId(device.getManufacturerId());
+		deviceData.setReceiveDate(Integer.parseInt(receiveTime.substring(0, 8)));
+		deviceData.setReceiveTime(parserDate(receiveTime));
+		deviceData.setSiteId(device.getSiteId());
+		deviceData.setStatus(1);
+		deviceData.setData(getDeviceMeasuringValueMap(dataMap));
+		deviceData.setBuildingId(device.getBuildingId());
+		return deviceData ;
+	}
+
+	protected  Map<String,String> getDeviceMeasuringValueMap(Map<String, MeasuringData> dataMap){
+		Map<String,String> measuringValueMap = new HashMap<>(dataMap.size());
+		Iterator<Map.Entry<String, MeasuringData>> iterator = dataMap.entrySet().iterator();
+		while(iterator.hasNext()){
+			Map.Entry<String, MeasuringData> next = iterator.next();
+			measuringValueMap.put(next.getValue().getMeasuringCode(),next.getValue().getMeasuringVaule());
+		}
+		return measuringValueMap ;
+	}
+
 	protected Map<String, MeasuringData> measuringDataListToMap(List<MeasuringData> measuringDataList) {
 		Map<String, MeasuringData> map = new HashMap<String, MeasuringData>(measuringDataList.size());
 		for (MeasuringData data : measuringDataList) {

+ 4 - 1
smart-city-intf/src/main/resources/application-188sit.properties

@@ -32,7 +32,7 @@ mybatis.configuration.use-column-label=true
 
 spring.jackson.time-zone=GMT+8
 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
-spring.jackson.joda-date-time-format: yyyy-MM-dd HH:mm:ss
+spring.jackson.joda-date-time-format=yyyy-MM-dd HH:mm:ss
 
 #redis
 spring.redis.host=114.135.61.188
@@ -55,3 +55,6 @@ com.zcxk.kafka.alarm.topic=sc_alarm_topic_sit
 com.zcxk.kafka.card.topic=sc_card_topic_sit
 com.zcxk.kafka.single.data.topic=sc_single_data_topic
 com.zcxk.kafka.multi.data.topic=sc_multi_data_topic
+######################################################MongoDBÅäÖÃ#####################################################
+spring.data.mongodb.uri=mongodb://114.135.61.188:17017/iot-water-database
+logging.level.org.springframework.data.mongodb.core=DEBUG

+ 5 - 6
smart-city-intf/src/main/resources/application-dev.properties

@@ -31,7 +31,7 @@ mybatis.configuration.use-column-label=true
 
 spring.jackson.time-zone=GMT+8
 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
-spring.jackson.joda-date-time-format: yyyy-MM-dd HH:mm:ss
+spring.jackson.joda-date-time-format=yyyy-MM-dd HH:mm:ss
 #redis config
 spring.redis.host=114.135.61.188
 spring.redis.port=26379
@@ -60,14 +60,11 @@ aliyun.nms.access.key=LTAI9bq4vr6ClPGW
 aliyun.nms.secret.key=hKYWEneQH2hQOs2DnG4PsAUqk4Thps
 aliyun.nms.server.name=http://MQ_INST_1159852013518008_Ba5mtzmw.mq-internet-access.mq-internet.aliyuncs.com:80
 aliyun.nms.group.id=GID_ZONIOT_NMS_DEV
-
-
 ##############################RabbitMQ#########################################
 spring.rabbitmq.host=114.135.61.188
 spring.rabbitmq.port=55672
 spring.rabbitmq.username=zoniot
 spring.rabbitmq.password=zcxk100
-
 # 开启发送确认
 spring.rabbitmq.publisher-confirms=true
 # 开启发送失败退回
@@ -75,6 +72,8 @@ spring.rabbitmq.publisher-returns=true
 # 开启ACK
 spring.rabbitmq.listener.direct.acknowledge-mode=manual
 spring.rabbitmq.listener.simple.acknowledge-mode=manual
-
 com.zcxk.rabbitmq.water.meter.queue=water_meter_queue
-com.zcxk.rabbitmq.command.status.queue=command_status_queue
+com.zcxk.rabbitmq.command.status.queue=command_status_queue
+######################################################MongoDB配置#####################################################
+spring.data.mongodb.uri=mongodb://114.135.61.188:17017/iot-water-database
+logging.level.org.springframework.data.mongodb.core=DEBUG

+ 3 - 1
smart-city-intf/src/main/resources/application-sit-node0.properties

@@ -6,7 +6,6 @@ server.workId=0
 spring.jackson.time-zone=GMT+8
 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
 spring.jackson.joda-date-time-format=yyyy-MM-dd HH:mm:ss
-spring.data.mongodb.uri=mongodb://39.108.172.131:27017/smart-city
 ###################################################数据库配置######################################################
 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
 spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
@@ -76,3 +75,6 @@ spring.rabbitmq.listener.simple.acknowledge-mode=manual
 com.zcxk.rabbitmq.water.meter.queue=water_meter_queue
 com.zcxk.rabbitmq.command.status.queue=command_status_queue
 com.zcxk.rabbitmq.meter.data.exchange=meter-data-exchange
+######################################################MongoDB配置#####################################################
+spring.data.mongodb.uri=mongodb://114.135.61.188:17017/iot-water-database
+logging.level.org.springframework.data.mongodb.core=DEBUG

+ 4 - 2
smart-city-intf/src/main/resources/application-sit-node1.properties

@@ -6,7 +6,6 @@ server.workId=1
 spring.jackson.time-zone=GMT+8
 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
 spring.jackson.joda-date-time-format=yyyy-MM-dd HH:mm:ss
-spring.data.mongodb.uri=mongodb://39.108.172.131:27017/smart-city
 ###################################################数据库配置######################################################
 spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
 spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
@@ -75,4 +74,7 @@ spring.rabbitmq.listener.direct.acknowledge-mode=manual
 spring.rabbitmq.listener.simple.acknowledge-mode=manual
 com.zcxk.rabbitmq.water.meter.queue=water_meter_queue
 com.zcxk.rabbitmq.command.status.queue=command_status_queue
-com.zcxk.rabbitmq.mete.data.exchange=meter-data-exchange
+com.zcxk.rabbitmq.mete.data.exchange=meter-data-exchange
+######################################################MongoDB配置#####################################################
+spring.data.mongodb.uri=mongodb://114.135.61.188:17017/iot-water-database
+logging.level.org.springframework.data.mongodb.core=DEBUG

+ 4 - 3
smart-city-intf/src/main/resources/application-test.properties

@@ -32,9 +32,7 @@ mybatis.configuration.use-column-label=true
 
 spring.jackson.time-zone=GMT+8
 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
-spring.jackson.joda-date-time-format: yyyy-MM-dd HH:mm:ss
-
-spring.data.mongodb.uri= mongodb://39.108.172.131:27017/smart-city
+spring.jackson.joda-date-time-format=yyyy-MM-dd HH:mm:ss
 
 #redis
 spring.redis.host=114.135.61.188
@@ -62,3 +60,6 @@ aliyun.nms.access.key=LTAI9bq4vr6ClPGW
 aliyun.nms.secret.key=hKYWEneQH2hQOs2DnG4PsAUqk4Thps
 aliyun.nms.server.name=http://MQ_INST_1159852013518008_Ba5mtzmw.mq-internet-access.mq-internet.aliyuncs.com:80
 aliyun.nms.group.id=GID_ZONIOT_NMS_TEST
+######################################################MongoDBÅäÖÃ#####################################################
+spring.data.mongodb.uri=mongodb://114.135.61.188:17017/iot-water-database
+logging.level.org.springframework.data.mongodb.core=DEBUG

+ 5 - 2
smart-city-intf/src/main/resources/application-uat.properties

@@ -31,7 +31,7 @@ mybatis.configuration.use-column-label=true
 
 spring.jackson.time-zone=GMT+8
 spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
-spring.jackson.joda-date-time-format: yyyy-MM-dd HH:mm:ss
+spring.jackson.joda-date-time-format=yyyy-MM-dd HH:mm:ss
 
 #redis
 spring.redis.host=114.135.61.188
@@ -76,4 +76,7 @@ spring.rabbitmq.listener.direct.acknowledge-mode=manual
 spring.rabbitmq.listener.simple.acknowledge-mode=manual
 
 com.zcxk.rabbitmq.water.meter.queue=water_meter_queue_uat
-com.zcxk.rabbitmq.command.status.queue=command_status_queue_uat
+com.zcxk.rabbitmq.command.status.queue=command_status_queue_uat
+######################################################MongoDB配置#####################################################
+spring.data.mongodb.uri=mongodb://114.135.61.188:17017/iot-water-database
+logging.level.org.springframework.data.mongodb.core=DEBUG