浏览代码

新增发送物联网数据到抄表系统功能 PengDi@2021/1/4

pengdi@zoniot.com 4 年之前
父节点
当前提交
c97ce410d4

+ 93 - 63
smart-city-intf/src/main/java/com/zcxk/smartcity/data/access/service/impl/DeviceDataServiceV2Impl.java

@@ -2,6 +2,7 @@ package com.zcxk.smartcity.data.access.service.impl;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONObject;
+import com.zcxk.smartcity.data.access.common.CommonConstant;
 import com.zcxk.smartcity.data.access.common.RedisUtil;
 import com.zcxk.smartcity.data.access.dao.*;
 import com.zcxk.smartcity.data.access.dto.ConfigDataDto;
@@ -9,6 +10,7 @@ import com.zcxk.smartcity.data.access.dto.UdipUnitDataDTO;
 import com.zcxk.smartcity.data.access.dto.ValveControlDataDto;
 import com.zcxk.smartcity.data.access.entity.*;
 import com.zcxk.smartcity.data.access.protocol.model.MeasuringData;
+import com.zcxk.smartcity.data.access.rabbitmq.WaterMeterDataSender;
 import com.zcxk.smartcity.data.access.service.ConfigService;
 import com.zcxk.smartcity.data.access.service.DeviceDataServiceV2;
 import com.zcxk.smartcity.data.access.util.JacksonUtil;
@@ -26,6 +28,7 @@ import org.springframework.expression.spel.support.StandardEvaluationContext;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Service;
 
+import javax.annotation.Resource;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.*;
@@ -36,7 +39,7 @@ import static com.google.common.collect.Lists.newArrayList;
 @Service
 public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
 
-    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss"); // yyyy-MM-dd HH:mm:ss
+    private static final SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");
     private static EvaluationContext context = new StandardEvaluationContext();
     private static ExpressionParser parser = new SpelExpressionParser();
     @Autowired
@@ -79,53 +82,52 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
     @Value("${com.zcxk.kafka.accept_valve_state.topic}")
     private String acceptValveStateTopic;
 
+    @Resource
+    WaterMeterDataSender waterMeterDataSender;
 
     @Override
-    public int savaDeviceData(Device device, UdipUnitDataDTO unitData) {
-        log.info("begin savaDeviceData,device = {} ,data = {}" , JSON.toJSONString(device) ,JSON.toJSONString(unitData));
+    public int saveDeviceData(Device device, UdipUnitDataDTO unitData) {
+        log.info("begin saveDeviceData,device = {} ,data = {}" , JSON.toJSONString(device) ,JSON.toJSONString(unitData));
 
         JSONObject jsonObject = JSONObject.parseObject(unitData.getParsedData());
-        //JSONObject data = JSONObject.parseObject(jsonObject.getString("data"));
         Map<String, Object> map = (Map<String, Object>)jsonObject;
 
-
-        //log.info("map = {}" , JSON.toJSONString(map));
-        List<MeasuringData> measuringDatas = newArrayList();
+        List<MeasuringData> measuringDataList = newArrayList();
         for (Map.Entry<String, Object> m : map.entrySet()) {
             MeasuringData measuringData = new MeasuringData();
             measuringData.setMeasuringCode(m.getKey());
             measuringData.setMeasuringVaule(String.valueOf(m.getValue()));
-            measuringDatas.add(measuringData);
+            measuringDataList.add(measuringData);
         }
+        // 2018 12 21 10 06 59
+        String lastUpdateTime = sdf.format(unitData.getEventTime());
 
-        String lastUpateTime = sdf.format(unitData.getEventTime()); // 2018 12 21 10 06 59
-
-        Map<String, MeasuringData> measuringDataMap = measuringDataListToMap(measuringDatas);
+        Map<String, MeasuringData> measuringDataMap = measuringDataListToMap(measuringDataList);
         // 将设备上报数据存入缓存中
         try {
             // 构造上报时间测点数据
             MeasuringData tmd = new MeasuringData();
             tmd.setMeasuringCode("TIME");
             tmd.setMeasuringName("上报时间");
-            tmd.setMeasuringVaule(lastUpateTime);
+            tmd.setMeasuringVaule(lastUpdateTime);
 
             measuringDataMap.put("TIME", tmd);
             // 以设备号为KEY,对应测点数据为值放入到缓存
             redisUtil.set(String.valueOf(device.getId()), JSON.toJSONString(measuringDataMap));
-            //redisUtil.set("deviceLastData:"+String.valueOf(device.getId()), JSON.toJSONString(measuringDataMap));
         } catch (Exception e) {
             log.error("save device data to cache failed", e);
         }
 
         // 保存设备测点数据
-        int j = saveDeviceMeasuringDatas(device, measuringDataMap, lastUpateTime,unitData.getEventTime());
+        int j = saveDeviceMeasuringDatas(device, measuringDataMap, lastUpdateTime,unitData.getEventTime());
         // 从测点数据中解析设备产生的告警
-        List<DeviceError> errorList = saveDeviceAlarmDatas(device,measuringDataMap,lastUpateTime);
+        List<DeviceError> errorList = saveDeviceAlarmDatas(device,measuringDataMap,lastUpdateTime);
         // 处理产生的告警信息
         List<DeviceError> processAlarmList = processAlarmList(device, errorList);
         // 更新设备状态
-        int newStatus = changeDeviceStatus(device,processAlarmList,lastUpateTime);
-        if(newStatus != 2) { // 新设备状态若不为故障,则将设备连续故障天数清零
+        int newStatus = changeDeviceStatus(device,processAlarmList,lastUpdateTime);
+        if(newStatus != CommonConstant.DEVICE_ERROR) {
+            // 新设备状态若不为故障,则将设备连续故障天数清零
             deviceMapper.updateDeviceErrorDays(device.getId(), 0);
         }
         // 推送报警信息
@@ -138,7 +140,7 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
             log.error("alarm message send failed !",e);
         }
 
-        log.info("end savaDeviceData,total measuring data size = {} ,device status = {} ",j,1);
+        log.info("end saveDeviceData,total measuring data size = {} ,device status = {} ",j,1);
         return j;
     }
 
@@ -148,9 +150,11 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
         int i = 0;
         for (DeviceMeasuringPoint dmp : dmps) {
             MeasuringData measuringData = dataMap.get(dmp.getCodeName());
-            if(measuringData == null) continue;
-            String measuringVaule = measuringData.getMeasuringVaule(); // 测点值
-            if (StringUtils.isNotEmpty(measuringVaule)) {
+            if(measuringData == null){
+                continue;
+            }
+            String measuringValue = measuringData.getMeasuringVaule();
+            if (StringUtils.isNotEmpty(measuringValue)) {
                 // 1,添加测点数据
                 DeviceDataDim dd = new DeviceDataDim();
                 dd.setId(idWorker.nextId());
@@ -161,7 +165,7 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
                 dd.setLocation(device.getLocDesc());
                 dd.setMeasuringId(dmp.getId());
                 dd.setMeasuringCode(dmp.getCodeName());
-                dd.setMeasuringData(measuringVaule);
+                dd.setMeasuringData(measuringValue);
                 dd.setSiteId(device.getSiteId());
                 dd.setSysId(device.getSysId());
                 dd.setSendDate(Integer.parseInt(lastUpateTime.substring(0, 8)));
@@ -173,6 +177,15 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
         }
         // 2,判断设备是否水表,如果是水表则保存读数
         if(device.getIsWater() == 1) {
+            try
+            {
+                waterMeterDataSender.sendMessage(device.getId(), dataMap);
+            }
+            catch (Exception e)
+            {
+                e.printStackTrace();
+                log.error("发送抄表数据失败,设备={}", device.getId(), e);
+            }
             MeasuringData wsvData = dataMap.get("WSV");
             if(wsvData != null) {
                 WaterMeterErrorDays record = new WaterMeterErrorDays();
@@ -181,8 +194,6 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
                 record.setStatus(1);
                 waterMeterErrorDaysMapper.insertSelective(record);
             }
-
-
             MeasuringData currentQuantity = dataMap.get("currentQuantity");
             MeasuringData valveState = dataMap.get("valveState");
             if(currentQuantity != null || valveState!=null) {
@@ -194,13 +205,12 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
                 }
                 if (valveState != null) {
                     if("0".equals(valveState.getMeasuringVaule())){
-                        record.setValveStatus(1);//0开,1关,2故障
+                        //0开,1关,2故障
+                        record.setValveStatus(1);
                     }else {
                         record.setValveStatus(0);
                     }
                     //推送阀门状态至抄表平台
-
-
                     if (device.getCustomerId() != null) {
                         if(this.matches(device.getCustomerId())){
                             ValveControlDataDto valveControlData = new ValveControlDataDto();
@@ -239,7 +249,6 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
 
     private boolean matches(Integer customerId) {
         List<ConfigDataDto> configDataList = configService.getConfigData("push_valve_config");
-        //log.info("customerId={},configDataList={}",customerId,JSON.toJSON(configDataList));
         if (configDataList != null && configDataList.size() > 0) {
             for (ConfigDataDto configDataDto : configDataList) {
                 if(StringUtils.equals(configDataDto.getValue(),customerId.toString())){
@@ -257,9 +266,9 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
         log.info("begin saveDeviceAlarmData  ,device = {} ,data = {}  " , JSON.toJSONString(device) ,JSON.toJSONString(dataMap));
         List<DeviceError> errorList = new ArrayList<DeviceError>();
         // 1,根据设备类型查找设备报警类型
-        Integer deviceType = device.getDeviceType(); // 设备类型
-        Integer siteId = device.getSiteId(); // 站点
-        Integer sysId = device.getSysId(); // 场景
+        Integer deviceType = device.getDeviceType();
+        Integer siteId = device.getSiteId();
+        Integer sysId = device.getSysId();
 
         AlarmType forQuery = new AlarmType();
         forQuery.setDeviceTypeId(deviceType);
@@ -286,31 +295,39 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
                 for (DeviceAlarmRule rule : rules) {
                     String measuringCode = rule.getMeasuringCode();
                     MeasuringData mdata = dataMap.get(measuringCode);
-                    if(mdata == null) break;
-                    String measuringData = mdata.getMeasuringVaule(); // 测点对应的值
-                    if(2 == rule.getMeasuringDataType()) { // 连续型
+                    if(mdata == null) {
+                        break;
+                    }
+                    String measuringData = mdata.getMeasuringVaule();
+                    // 测点对应的值
+                    if(2 == rule.getMeasuringDataType()) {
+                        // 连续型
                         context.setVariable("value", Double.parseDouble(measuringData));
                     }
-                    else context.setVariable("value", measuringData);
+                    else{
+                        context.setVariable("value", measuringData);
+                    }
                     Boolean current = parser.parseExpression(rule.getExpression()).getValue(context, Boolean.class);
-                    isOk = isOk & current;
-                    if(!isOk) break;
+                    isOk = isOk && current;
+                    if(!isOk) {
+                        break;
+                    }
                     else {
                         Map<String, Object> map = new HashMap<String,Object>();
                         map.put("measuringVaule", dataMap.get(measuringCode).getMeasuringVaule());
                         map.put("measuringUnit", dataMap.get(measuringCode).getMeasuringUnit());
                         map.put("measuringName", dataMap.get(measuringCode).getMeasuringName());
-                        map.put("ruleId", rule.getId()); //  规则ID
+                        map.put("ruleId", rule.getId());
                         MeasuringDesc p1 = new MeasuringDesc();
                         p1.setStatus(1);
                         p1.setMeasuringCode(measuringCode);
                         p1.setDeviceType(deviceType);
-                        MeasuringDesc md = measuringDescMapper.singleQuery((Map<String, Object>)MapTransformUtils.objectToMap(p1)); // 查询设备对应的测点信息
+                        MeasuringDesc md = measuringDescMapper.singleQuery((Map<String, Object>)MapTransformUtils.objectToMap(p1));
                         map.put("measuringId", md.getMeasuringId());
-                        if("2".equals(md.getMeasuringDataType())) { // 连续型
+                        if("2".equals(md.getMeasuringDataType())) {
                             map.put("showData", measuringData);
                         }
-                        else { // 离散型
+                        else {
                             Integer mdId = md.getId();
                             MeasuringDataDef p2 = new MeasuringDataDef();
                             p2.setStatus(1);
@@ -328,22 +345,26 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
                 if (isOk && alarmMeasuringDataMap.size()!= 0) {
                     DeviceError error = new DeviceError();
                     long alarmId = idWorker.nextId();
-                    error.setAlarmCategory(alarmType.getAlarmCategory()); // 1,业务报警 ;2,设备故障
+                    // 1,业务报警 ;2,设备故障
+                    error.setAlarmCategory(alarmType.getAlarmCategory());
                     error.setAlarmTypeId(alarmType.getId());
                     error.setDeviceId(device.getId());
                     error.setDeviceNo(device.getDeviceNo());
                     error.setBuildingId(device.getBuildingId());
                     error.setFloor(device.getFloorId());
-                    error.setHandleStatus("2"); // 1.已处理;2.未处理
+                    // 1.已处理;2.未处理
+                    error.setHandleStatus("2");
                     error.setId(alarmId);
                     error.setLocation(device.getLocDesc());
                     error.setSiteId(device.getSiteId());
                     error.setSysId(device.getSysId());
                     error.setStatus(1);
-                    error.setAlarmTime(parserDate(receiveTime)); // 首次告警时间
-                    error.setLastAlarmTime(parserDate(receiveTime)); // 最后告警时间
-                    error.setAlarmCount(1); // 告警次数
-//					deviceErrorMapper.insert(error);
+                    // 首次告警时间
+                    error.setAlarmTime(parserDate(receiveTime));
+                    // 最后告警时间
+                    error.setLastAlarmTime(parserDate(receiveTime));
+                    // 告警次数
+                    error.setAlarmCount(1);
                     errorList.add(error);
                     // 5,保存报警产生的测点数据
                     List<AlarmMeasuringData> amdList = new ArrayList<AlarmMeasuringData>();
@@ -363,7 +384,6 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
                     }
                     if(amdList !=null && amdList.size() != 0) {
                         error.setAlarmMeasuringData(amdList);
-//						alarmMeasuringDataMapper.insertList(amdList);
                     }
                 }
             }
@@ -383,7 +403,8 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
         // 1,查询最新告警记录
         List<DeviceError> lastErrorList = getDeviceLastErrorList(device);
         if(errors.size() != 0 ) {
-            if(lastErrorList== null || lastErrorList.size() == 0) { // 无最新告警记录,将告警记录直接入库
+            if(lastErrorList== null || lastErrorList.size() == 0) {
+                // 无最新告警记录,将告警记录直接入库
                 for(DeviceError  de :errors ) {
                     rtnList.add(saveNewDeviceError(de));
                 }
@@ -394,21 +415,26 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
                     boolean isSame = false ;
                     DeviceError originalError = null ;
                     for( DeviceError lde : lastErrorList) {
-                        if("2".equals(lde.getHandleStatus()) && lde.getAlarmTypeId() == de.getAlarmTypeId()) { // 处理状态  1.已处理;2.未处理
+                        if(CommonConstant.UN_HANDLE_STATUS.equals(lde.getHandleStatus()) &&
+                                lde.getAlarmTypeId().equals(de.getAlarmTypeId())) {
+                            // 处理状态  1.已处理;2.未处理
                             isSame = true ;
                             originalError = lde;
                             break ;
                         }
                     }
-                    if(!isSame) { // 告警类型与上次告警类型不同,则直接新增记录
+                    if(!isSame) {
+                        // 告警类型与上次告警类型不同,则直接新增记录
                         rtnList.add(saveNewDeviceError(de));
                     }
-                    else { // 告警类型与上次告警类型相同
+                    else {
+                        // 告警类型与上次告警类型相同
                         // 3,查询上次告警时间至本次告警时间是否有上传数据
                         Date lastAlarmTime = lastErrorList.get(0).getLastAlarmTime();
                         Date currentAlarmTime = de.getAlarmTime();
-                        List<DeviceDataDim> receiveDatas = deviceDataDimMapper.getDeviceDataBetweenDate(lastAlarmTime, currentAlarmTime, device.getId());
-                        if(receiveDatas != null && receiveDatas.size() != 0) { // 有新上报数据,记录为新告警
+                        List<DeviceDataDim> receiveDataList = deviceDataDimMapper.getDeviceDataBetweenDate(lastAlarmTime, currentAlarmTime, device.getId());
+                        if(receiveDataList != null && receiveDataList.size() != 0) {
+                            // 有新上报数据,记录为新告警
                             rtnList.add(saveNewDeviceError(de));
                         }
                         else { // 无新上报数据,更新最后一次报警的次数与最后上报时间
@@ -422,7 +448,7 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
     }
 
     protected Map<String, MeasuringData> measuringDataListToMap(List<MeasuringData> measuringDataList) {
-        Map<String, MeasuringData> map = new HashMap<String, MeasuringData>();
+        Map<String, MeasuringData> map = new HashMap<String, MeasuringData>(measuringDataList.size());
         for (MeasuringData data : measuringDataList) {
             map.put(data.getMeasuringCode(), data);
         }
@@ -448,9 +474,9 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
         param.setLastAlarmTime(newError.getAlarmTime());
         deviceErrorMapper.updateByPrimaryKeySelective(param);
         // 2,更新告警测点信息
-        List<AlarmMeasuringData> alarmMeasuringDatas = newError.getAlarmMeasuringData();
-        if(alarmMeasuringDatas != null && alarmMeasuringDatas.size() != 0) {
-            for(AlarmMeasuringData amd : alarmMeasuringDatas) {
+        List<AlarmMeasuringData> alarmMeasuringDataList = newError.getAlarmMeasuringData();
+        if(alarmMeasuringDataList != null && alarmMeasuringDataList.size() != 0) {
+            for(AlarmMeasuringData amd : alarmMeasuringDataList) {
                 // 根据告警与测点查询测点数据后进行更新
                 String measuringCode = amd.getMeasuringCode() ;
                 Long errorId = oldError.getId();
@@ -480,9 +506,9 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
         // 1,先保存告警基本信息;
         deviceErrorMapper.insert(error);
         // 2,再保存告警产生的测点信息
-        List<AlarmMeasuringData> alarmMeasuringDatas = error.getAlarmMeasuringData() ;
-        if(alarmMeasuringDatas != null && alarmMeasuringDatas.size() != 0) {
-            alarmMeasuringDataMapper.insertList(alarmMeasuringDatas);
+        List<AlarmMeasuringData> alarmMeasuringDataList = error.getAlarmMeasuringData() ;
+        if(alarmMeasuringDataList != null && alarmMeasuringDataList.size() != 0) {
+            alarmMeasuringDataMapper.insertList(alarmMeasuringDataList);
         }
         return error ;
     }
@@ -519,8 +545,12 @@ public class DeviceDataServiceV2Impl implements DeviceDataServiceV2 {
         Integer newStatus = 0 ;
         // 2,判断告警列表中是否包含故障告警
         boolean isFault = isFaultAlarm(errors);
-        if(isFault) newStatus = 2 ;
-        else newStatus = 1 ;
+        if(isFault) {
+            newStatus = CommonConstant.DEVICE_ERROR ;
+        }
+        else{
+            newStatus = CommonConstant.DEVICE_NORMAL ;
+        }
         // 3,更新最后上报时间与最新状态
         Device param = new Device();
         param.setId(device.getId());