|
@@ -3,6 +3,7 @@ package com.bz.smart_city.kafka.consumer;
|
|
|
import com.bz.smart_city.commom.util.MessageSend.JSONUtils;
|
|
|
import com.bz.smart_city.dao.OperatingValveRecordMapper;
|
|
|
import com.bz.smart_city.dto.ValveControlDataDto;
|
|
|
+import com.bz.smart_city.entity.Device;
|
|
|
import com.bz.smart_city.entity.OperatingValveRecord;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
|
@@ -31,9 +32,10 @@ public class ValveStatusReceiver {
|
|
|
operatingValveRecord.setStatus(1);
|
|
|
List<OperatingValveRecord> operatingValveRecords = operatingValveRecordMapper.selectList(operatingValveRecord);
|
|
|
OperatingValveRecord operatingValveRecordMessage=null;
|
|
|
- log.info("上报水表信息");
|
|
|
+ log.info("上报水表信息={}",message);
|
|
|
+ ValveControlDataDto valveControlDataDto = JSONUtils.json2pojo(message, ValveControlDataDto.class);
|
|
|
if(operatingValveRecords.size()>0){
|
|
|
- ValveControlDataDto valveControlDataDto = JSONUtils.json2pojo(message, ValveControlDataDto.class);
|
|
|
+
|
|
|
boolean flag=false;
|
|
|
for (OperatingValveRecord valveRecord : operatingValveRecords) {
|
|
|
if(valveRecord.getWaterNo().equals(valveControlDataDto.getMeterNo())
|
|
@@ -45,12 +47,23 @@ public class ValveStatusReceiver {
|
|
|
}
|
|
|
}
|
|
|
if(flag){
|
|
|
+
|
|
|
+ operatingValveRecordMapper.updateStatus(operatingValveRecordMessage);
|
|
|
+ }
|
|
|
+ if(operatingValveRecordMessage==null){
|
|
|
+ operatingValveRecordMessage=new OperatingValveRecord();
|
|
|
+ operatingValveRecordMessage.setWaterNo(valveControlDataDto.getMeterNo());
|
|
|
+ operatingValveRecordMessage.setCustomerId(valveControlDataDto.getCustomerId());
|
|
|
+ }
|
|
|
+ Device device=operatingValveRecordMapper.getDevice(operatingValveRecordMessage);
|
|
|
+ if(device!=null){
|
|
|
+ log.info("同步水表阀门状态 电子号={}",device.getMetercode());
|
|
|
Map<String,Object>valveMessage=new HashMap<>();
|
|
|
- valveMessage.put("meterNo",operatingValveRecordMessage.getWaterFileNo());
|
|
|
- valveMessage.put("meterStatus",operatingValveRecordMessage.getOperation());
|
|
|
+ valveMessage.put("meterNo",device.getMetercode());
|
|
|
+ valveMessage.put("meterStatus",Integer.parseInt(valveControlDataDto.getValve()));
|
|
|
kafkaTemplate.send(billingValveStatusReceiving, JSONUtils.obj2json(valveMessage));
|
|
|
- operatingValveRecordMapper.updateStatus(operatingValveRecordMessage);
|
|
|
}
|
|
|
+
|
|
|
}
|
|
|
}
|
|
|
}
|