Browse Source

kafka消息堆积 PengDi@2021/1/25

pengdi@zoniot.com 4 years ago
parent
commit
a3ad1c6c8c

+ 1 - 1
smart-city-intf/src/main/java/com/zcxk/smartcity/data/access/aliyun/consumer/AliNMSMessageConsumer.java

@@ -105,7 +105,7 @@ public class AliNMSMessageConsumer  implements InitializingBean,ApplicationConte
     					deviceDataBase.setIsAnalyse(2);
     				} else {
     					int i = 0;
-    					i = deviceDataService.savaDeviceData(device, deviceDataBase, parserData);
+    					i = deviceDataService.saveDeviceData(device, deviceDataBase, parserData);
     					if (i != 0) {
     						deviceDataBase.setIsAnalyse(1);
     					} else {

+ 32 - 28
smart-city-intf/src/main/java/com/zcxk/smartcity/data/access/kafka/consumer/SingleDataReceiver.java

@@ -102,7 +102,7 @@ public class SingleDataReceiver {
 		String mac = (String) jsonMap.get("mac");
 		if (jsonMap != null) {
 			// 1,上报数据入库
-			DeviceDataBase deviceDataBase = saveDeviceBaseData(jsonMap);
+			 DeviceDataBase deviceDataBase = saveDeviceBaseData(jsonMap);
 			// 2,查询设备,设备存在则继续按照协议解析数据
 			Device device = deviceService.findByDeviceNo((String) jsonMap.get("mac"));
 			ProtocolData parserData = null ;
@@ -111,34 +111,37 @@ public class SingleDataReceiver {
 				parserData = protocolEngine.parserData(device, (String) jsonMap.get("data"));
 				if (parserData == null) {
 					log.info("parser data is null , mac = {}",mac);
-					deviceDataBase.setIsAnalyse(2);
+					//deviceDataBase.setIsAnalyse(2);
 				} else {
-					int i = 0;
-					if (2 == device.getDeviceCategory()) {
-						// 网关设备数据,将网关里面包含得设备信息继续发送
-						List<ProtocolData> datas = dataFission(parserData);
-						if (datas.size() != 0) {
-							for (ProtocolData data : datas) {
-								i += deviceDataService.savaDeviceData(device, deviceDataBase, data);
-								DeviceDataMessage msg = buildDataMsg(device, data);
-								kafkaTemplate.send(singleDataTopic, JSON.toJSONString(msg));
-							}
-						}
-					} else { // 非网关设备数据
-						i = deviceDataService.savaDeviceData(device, deviceDataBase, parserData);
-						// 如果是工卡,则发送消息给工卡系统处理;
-						if (device.getIsCard() == 1) {
-							parserData.setDeviceNo(device.getDeviceNo());
-							log.info("Begin Send Job Card Message ! Msg = {} " ,JSON.toJSONString(parserData));
-							kafkaTemplate.send(cardTopic,JSON.toJSONString(parserData));
-							log.info("End Send Job Card Message ! Msg = {}" , JSON.toJSONString(parserData));
-						}
-					}
-					if (i != 0) {
-						deviceDataBase.setIsAnalyse(1);
-					} else {
-						deviceDataBase.setIsAnalyse(2);
+//					int i = 0;
+//					if (2 == device.getDeviceCategory()) {
+//						// 网关设备数据,将网关里面包含得设备信息继续发送
+//						List<ProtocolData> datas = dataFission(parserData);
+//						if (datas.size() != 0) {
+////							for (ProtocolData data : datas) {
+////								i += deviceDataService.savaDeviceData(device, deviceDataBase, data);
+////								DeviceDataMessage msg = buildDataMsg(device, data);
+////								kafkaTemplate.send(singleDataTopic, JSON.toJSONString(msg));
+////							}
+//						}
+//					} else { // 非网关设备数据
+//						i = deviceDataService.savaDeviceData(device, deviceDataBase, parserData);
+//						// 如果是工卡,则发送消息给工卡系统处理;
+////						if (device.getIsCard() == 1) {
+////							parserData.setDeviceNo(device.getDeviceNo());
+////							log.info("Begin Send Job Card Message ! Msg = {} " ,JSON.toJSONString(parserData));
+////							kafkaTemplate.send(cardTopic,JSON.toJSONString(parserData));
+////							log.info("End Send Job Card Message ! Msg = {}" , JSON.toJSONString(parserData));
+////						}
+//					}
+					if(device.getDeviceCategory() != 2){
+						deviceDataService.saveDeviceData(device, deviceDataBase, parserData);
 					}
+//					if (i != 0) {
+//						deviceDataBase.setIsAnalyse(1);
+//					} else {
+//						deviceDataBase.setIsAnalyse(2);
+//					}
 				}
 			}
 			// 3,判断设备数据信息是否需要推送
@@ -282,7 +285,8 @@ public class SingleDataReceiver {
 		deviceDataBase.setIsAnalyse(0);
 
 		deviceDataBase.setDateCreate(LocalDateTime.now());
-		deviceDataBaseService.insert(deviceDataBase);
+		// 写数据库暂时屏蔽
+		//deviceDataBaseService.insert(deviceDataBase);
 		// 2,将上报网关信息写于到网关表 网关数据暂时没用到先屏蔽
 //		List<DeviceDataGetway> gatewayList = buildGatewayList(deviceDataBase.getId(), message);
 //		if (gatewayList.size() != 0) {

+ 1 - 1
smart-city-intf/src/main/java/com/zcxk/smartcity/data/access/service/DeviceDataService.java

@@ -23,7 +23,7 @@ public interface DeviceDataService {
 	 * @param data
 	 * @return
 	 */
-	public int savaDeviceData(Device device ,DeviceDataBase srcData,  ProtocolData data);
+	public int saveDeviceData(Device device ,DeviceDataBase srcData,  ProtocolData data);
 	/**
 	 * 保存设备报警数据
 	 * @param device

+ 1 - 1
smart-city-intf/src/main/java/com/zcxk/smartcity/data/access/service/impl/DeviceDataServiceImpl.java

@@ -115,7 +115,7 @@ public class DeviceDataServiceImpl implements DeviceDataService {
 	DeviceDataRepository  deviceDataRepository ;
 
 	@Override
-	public int savaDeviceData(Device device, DeviceDataBase srcData, ProtocolData data) {
+	public int saveDeviceData(Device device, DeviceDataBase srcData, ProtocolData data) {
 		log.info(
 				"begin saveDeviceData,device = {} ,data = {}" ,JSON.toJSONString(device) ,JSON.toJSONString(data));
 		List<MeasuringData> measuringDataList = data.getMeasuringDatas();

+ 1 - 1
smart-city-intf/src/main/resources/application.properties

@@ -1,2 +1,2 @@
 #开发环境:dev  测试环境:sit  线上环境:prd
-spring.profiles.active=sit-node0
+spring.profiles.active=prd

+ 2 - 2
smart-city-intf/src/main/resources/mapper/DeviceMapper.xml

@@ -276,7 +276,7 @@
         LEFT JOIN sc_building b ON ( d.building_id = b.id )
         LEFT JOIN sc_w_meter_type mt ON ( d.device_type = mt.device_type_id)
         LEFT JOIN sc_channel_device_type_use tu on ( d.device_type = tu.device_type_id and tu.`status` = 1 )
-        left join sc_channel c on (tu.channel_id = c.id and c.channel_code = 'lora_meter' and c.`status` = 1)
+        left join sc_channel c on (tu.channel_id = c.id and c.channel_code = 'loraMeter' and c.`status` = 1)
         WHERE
         d.STATUS = 1
         AND d.device_no = #{deviceNo} limit 1
@@ -321,7 +321,7 @@
         LEFT JOIN sc_building b ON ( d.building_id = b.id )
         LEFT JOIN sc_w_meter_type mt ON ( d.device_type = mt.device_type_id)
         LEFT JOIN sc_channel_device_type_use tu on ( d.device_type = tu.device_type_id and tu.`status` = 1 )
-        left join sc_channel c on (tu.channel_id = c.id and c.channel_code = 'lora_meter' and c.`status` = 1)
+        left join sc_channel c on (tu.channel_id = c.id and c.channel_code = 'loraMeter' and c.`status` = 1)
         WHERE
         d.STATUS = 1
         AND d.water_meter_no = #{meterNo,jdbcType=VARCHAR} limit 1

+ 1 - 1
smart-city-intf/src/test/java/com/zcxk/smartcity/data/access/DeviceDataServiceTests.java

@@ -74,7 +74,7 @@ public class DeviceDataServiceTests {
 		Device device = deviceMapper.findByDeviceNo("004a770124007922");
 		DeviceDataBase data = deviceDataBaseMapper.findById(555848708944560128l);
 		ProtocolData parserData = protocolEngine.parserData(device, data.getData());
-		int i = deviceDataService.savaDeviceData(device, data, parserData);
+		int i = deviceDataService.saveDeviceData(device, data, parserData);
 		System.out.println(i);
 	}