|
@@ -0,0 +1,322 @@
|
|
|
+package com.bz.smart_city.service.impl;
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON;
|
|
|
+import com.bz.smart_city.commom.model.CommonQueryCondition;
|
|
|
+import com.bz.smart_city.commom.util.DateTimeUtil;
|
|
|
+import com.bz.smart_city.dao.DeviceMapper;
|
|
|
+import com.bz.smart_city.dao.MeterPushDataMapper;
|
|
|
+import com.bz.smart_city.dao.MeterReadRecordMapper;
|
|
|
+import com.bz.smart_city.dto.MessageData;
|
|
|
+import com.bz.smart_city.entity.Device;
|
|
|
+import com.bz.smart_city.entity.MeterPushData;
|
|
|
+import com.bz.smart_city.entity.MeterReadRecord;
|
|
|
+import com.bz.smart_city.entity.ScRabbitConfig;
|
|
|
+import com.bz.smart_city.service.SyncWaterMeterDataService;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.amqp.rabbit.connection.SimpleResourceHolder;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.math.BigDecimal;
|
|
|
+import java.util.*;
|
|
|
+
|
|
|
+/**
|
|
|
+ * <p></p>
|
|
|
+ *
|
|
|
+ * @Author wilian.peng
|
|
|
+ * @Date 2020/3/5 18:56
|
|
|
+ * @Version 1.0
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+public class GeneralRabbitSyncService {
|
|
|
+ @Autowired
|
|
|
+ RabbitTemplate rabbitTemplate;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ MeterReadRecordMapper meterReadRecordMapper;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ DeviceMapper deviceMapper ;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ MeterPushDataMapper meterPushDataMapper ;
|
|
|
+
|
|
|
+ @Value("${spring.profiles.active}")
|
|
|
+ String env ;
|
|
|
+
|
|
|
+ /**
|
|
|
+ */
|
|
|
+ public void syncWaterMeterData(ScRabbitConfig config){
|
|
|
+ // 1,查询昨日抄表数据
|
|
|
+ int yesterday = Integer.parseInt(DateTimeUtil.formatDate(DateTimeUtil.beforeNow(1), "yyyyMMdd"));
|
|
|
+ int today = Integer.parseInt(DateTimeUtil.formatNow("yyyyMMdd"));
|
|
|
+ CommonQueryCondition condition = new CommonQueryCondition();
|
|
|
+ condition.setCustormerId(config.getCustomerId());
|
|
|
+ if("prd".equals(env)){
|
|
|
+ condition.setStartDate(yesterday);
|
|
|
+ condition.setEndDate(yesterday);
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ condition.setStartDate(20201113);
|
|
|
+ condition.setEndDate(20201113);
|
|
|
+ }
|
|
|
+ // 只传已验收上线的
|
|
|
+ condition.setChannelId(40);
|
|
|
+ log.info("begin query meter read records ,condition = {}",JSON.toJSONString(condition));
|
|
|
+ List<MeterReadRecord> meterReadRecords = meterReadRecordMapper.queryMeterReadRecordWithCondtion(condition);
|
|
|
+ log.info("end query meter read records ,result size is = {}",meterReadRecords.size());
|
|
|
+ // 2,构建消息
|
|
|
+ for(MeterReadRecord record : meterReadRecords){
|
|
|
+ try{
|
|
|
+ Long deviceId = record.getDeviceId();
|
|
|
+ Device device = deviceMapper.findByDeviceId(deviceId);
|
|
|
+ // 设备为故障时不进行数据同步
|
|
|
+ if(device.getDeviceStatus() == 2){
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ MeterPushData mpd = new MeterPushData();
|
|
|
+ mpd.setCustomerId(device.getCustomerId());
|
|
|
+ mpd.setDeviceId(device.getId());
|
|
|
+ mpd.setDeviceNo(device.getDeviceNo());
|
|
|
+ mpd.setDeviceStatus(device.getDeviceStatus());
|
|
|
+ mpd.setMeterNo(device.getWaterMeterNo());
|
|
|
+ mpd.setSyncDataDate(new Date());
|
|
|
+ mpd.setSyncDate(today);
|
|
|
+ // 非故障设备时判断是否有抄表数据
|
|
|
+ if (record.getReadStatus().equals("1")) {
|
|
|
+ // 无抄表数据,则继续取上日的推送数据进行推送,若上一日无推送数据则取最近一次的抄表数据进行推送
|
|
|
+ MeterReadRecord meterLastReadRecord = meterReadRecordMapper.findMeterLastReadRecord(deviceId);
|
|
|
+ if (meterLastReadRecord == null) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ mpd.setRealData(meterLastReadRecord.getReadData());
|
|
|
+ mpd.setRealDataDate(meterLastReadRecord.getReadTime());
|
|
|
+ mpd.setSyncData(meterLastReadRecord.getReadData());
|
|
|
+ mpd.setGap(new BigDecimal(0));
|
|
|
+ } else {
|
|
|
+ // 有抄表数据,则判断与上日推送数据是否存在-3内的波动差
|
|
|
+ MeterPushData pushData = meterPushDataMapper.selectByPrimaryKey(device.getId(), yesterday);
|
|
|
+ // 若不存在推送数据,则直接推送今日抄表数据
|
|
|
+ if (pushData == null) {
|
|
|
+ mpd.setRealData(record.getReadData());
|
|
|
+ mpd.setRealDataDate(record.getReadTime());
|
|
|
+ mpd.setSyncData(record.getReadData());
|
|
|
+ mpd.setGap(new BigDecimal(0));
|
|
|
+ } else {
|
|
|
+ // 若存在推送数据则比较今日数据真实上报数据进行比较,若比上日推送数据大或者波动差>=-3则直接进行推送,
|
|
|
+ // 若波动差<-3则推送上日的推送数据
|
|
|
+ mpd.setRealData(record.getReadData());
|
|
|
+ mpd.setRealDataDate(record.getReadTime());
|
|
|
+ BigDecimal syncData = new BigDecimal(pushData.getSyncData());
|
|
|
+ BigDecimal readData = new BigDecimal(record.getReadData());
|
|
|
+ BigDecimal gap = readData.subtract(syncData);
|
|
|
+ // 今日上报数据大于等于昨日推送数据,则推送今日上报数据
|
|
|
+ if (gap.compareTo(new BigDecimal(0)) > -1) {
|
|
|
+ mpd.setSyncData(record.getReadData());
|
|
|
+ mpd.setGap(gap);
|
|
|
+ } else {
|
|
|
+ // 波动差>=-3则直接进行推送
|
|
|
+ if (gap.abs().compareTo(new BigDecimal(3)) < 1) {
|
|
|
+ mpd.setSyncData(record.getReadData());
|
|
|
+ mpd.setGap(gap);
|
|
|
+ }
|
|
|
+ // 若波动差<-3则推送上日的推送数据
|
|
|
+ else {
|
|
|
+ mpd.setSyncData(pushData.getSyncData());
|
|
|
+ mpd.setGap(gap);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 将推送记录进行存储,并将数据进行推送
|
|
|
+ meterPushDataMapper.insert(mpd);
|
|
|
+ Map<String, Object> message = buildSendMessage(mpd);
|
|
|
+ if (message != null) {
|
|
|
+ // 3,发送消息
|
|
|
+ sendMessage(config,JSON.toJSONString(message));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("Synchronize Data Error ! ",e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ public void mockSyncWaterMeterData(ScRabbitConfig config,Long deviceId) {
|
|
|
+ // 1,查询昨日抄表数据
|
|
|
+ int yesterday = Integer.parseInt(DateTimeUtil.formatDate(DateTimeUtil.beforeNow(1), "yyyyMMdd"));
|
|
|
+ int today = Integer.parseInt(DateTimeUtil.formatNow("yyyyMMdd"));
|
|
|
+ MeterReadRecord record = meterReadRecordMapper.getLastDayData(deviceId, yesterday);
|
|
|
+ // 2,构建消息
|
|
|
+ if(record != null){
|
|
|
+ try{
|
|
|
+ Device device = deviceMapper.findByDeviceId(deviceId);
|
|
|
+ // 设备为故障时不进行数据同步
|
|
|
+ if(device.getDeviceStatus() == 2){
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ else {
|
|
|
+ MeterPushData mpd = new MeterPushData();
|
|
|
+ mpd.setCustomerId(device.getCustomerId());
|
|
|
+ mpd.setDeviceId(device.getId());
|
|
|
+ mpd.setDeviceNo(device.getDeviceNo());
|
|
|
+ mpd.setDeviceStatus(device.getDeviceStatus());
|
|
|
+ mpd.setMeterNo(device.getWaterMeterNo());
|
|
|
+ mpd.setSyncDataDate(new Date());
|
|
|
+ mpd.setSyncDate(today);
|
|
|
+ // 非故障设备时判断是否有抄表数据
|
|
|
+ if ("1".equals(record.getReadStatus())) {
|
|
|
+ // 无抄表数据,则继续取上日的推送数据进行推送,若上一日无推送数据则取最近一次的抄表数据进行推送
|
|
|
+ MeterPushData pushData = meterPushDataMapper.selectByPrimaryKey(device.getId(), yesterday);
|
|
|
+ if(pushData == null ){
|
|
|
+ MeterReadRecord meterLastReadRecord = meterReadRecordMapper.findMeterLastReadRecord(deviceId);
|
|
|
+ mpd.setRealData(meterLastReadRecord.getReadData());
|
|
|
+ mpd.setRealDataDate(meterLastReadRecord.getReadTime());
|
|
|
+ mpd.setSyncData(meterLastReadRecord.getReadData());
|
|
|
+ mpd.setGap(new BigDecimal(0));
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ mpd.setSyncData(pushData.getSyncData());
|
|
|
+ mpd.setGap(new BigDecimal(0));
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ // 有抄表数据,则判断与上日推送数据是否存在-3内的波动差
|
|
|
+ MeterPushData pushData = meterPushDataMapper.selectByPrimaryKey(device.getId(), yesterday);
|
|
|
+ // 若不存在推送数据,则直接推送今日抄表数据
|
|
|
+ if (pushData == null) {
|
|
|
+ mpd.setRealData(record.getReadData());
|
|
|
+ mpd.setRealDataDate(record.getReadTime());
|
|
|
+ mpd.setSyncData(record.getReadData());
|
|
|
+ mpd.setGap(new BigDecimal(0));
|
|
|
+ } else {
|
|
|
+ // 若存在推送数据则比较今日数据真实上报数据进行比较,若比上日推送数据大或者波动差>=-3则直接进行推送,
|
|
|
+ // 若波动差<-3则推送上日的推送数据
|
|
|
+ mpd.setRealData(record.getReadData());
|
|
|
+ mpd.setRealDataDate(record.getReadTime());
|
|
|
+ BigDecimal syncData = new BigDecimal(pushData.getSyncData());
|
|
|
+ BigDecimal readData = new BigDecimal(record.getReadData());
|
|
|
+ BigDecimal gap = readData.subtract(syncData);
|
|
|
+ // 今日上报数据大于等于昨日推送数据,则推送今日上报数据
|
|
|
+ if (gap.compareTo(new BigDecimal(0)) > -1) {
|
|
|
+ mpd.setSyncData(record.getReadData());
|
|
|
+ mpd.setGap(gap);
|
|
|
+ } else {
|
|
|
+ // 若波动差>-3,即波动差的绝对值小于3 ,则推送上日的推送数据
|
|
|
+ if (gap.abs().compareTo(new BigDecimal(3)) < 1) {
|
|
|
+ mpd.setSyncData(pushData.getSyncData());
|
|
|
+ mpd.setGap(gap);
|
|
|
+ }
|
|
|
+ // 波动差<=-3,即波动差的绝对值>3,则直接进行推送
|
|
|
+ else {
|
|
|
+ mpd.setSyncData(record.getReadData());
|
|
|
+ mpd.setGap(gap);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ // 将推送记录进行存储,并将数据进行推送
|
|
|
+ meterPushDataMapper.insert(mpd);
|
|
|
+ Map<String, Object> message = buildSendMessage(mpd);
|
|
|
+ if (message != null) {
|
|
|
+ // 3,发送消息
|
|
|
+ sendMessage(config,JSON.toJSONString(message));
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }catch (Exception e){
|
|
|
+ log.error("Synchronize Data Error ! ",e);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ protected Map<String,Object> buildSendMessage(MeterPushData pushData){
|
|
|
+ HashMap<String, Object> msg = new HashMap<>();
|
|
|
+ msg.put("type","nbxb");
|
|
|
+ // 设备编号
|
|
|
+ String deviceId= pushData.getMeterNo();
|
|
|
+ if(StringUtils.isNotBlank(deviceId)) {
|
|
|
+ // 设备编码补齐14位后再添加30
|
|
|
+ msg.put("deviceId","30"+deviceId) ;
|
|
|
+ List<MessageData> datas = new ArrayList<>() ;
|
|
|
+ MessageData data = new MessageData();
|
|
|
+ data.setS("v");
|
|
|
+ data.setT(System.currentTimeMillis());
|
|
|
+ data.setV(Double.parseDouble(pushData.getSyncData()));
|
|
|
+ datas.add(data);
|
|
|
+ msg.put("real",datas);
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ msg = null ;
|
|
|
+ }
|
|
|
+ return msg ;
|
|
|
+ }
|
|
|
+ /**
|
|
|
+ * {"real":[{"s":"v","t":1578534831000,"v":39.0}],"type":"nbxb","deviceId":"30000161034460"}
|
|
|
+ * @param record
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ @Deprecated
|
|
|
+ protected Map<String,Object> buildSendMessage(MeterReadRecord record){
|
|
|
+ HashMap<String, Object> msg = new HashMap<>();
|
|
|
+ msg.put("type","nbxb");
|
|
|
+
|
|
|
+ // 设备编号
|
|
|
+ String deviceId= record.getMeterNo();
|
|
|
+ if(StringUtils.isNotBlank(deviceId)) {
|
|
|
+ // 设备编码补齐14位后再添加30
|
|
|
+ msg.put("deviceId","30"+deviceId) ;
|
|
|
+ List<MessageData> datas = new ArrayList<>() ;
|
|
|
+ MessageData data = new MessageData();
|
|
|
+ data.setS("v");
|
|
|
+ //data.setT(record.getReadTime().getTime());
|
|
|
+ data.setT(System.currentTimeMillis());
|
|
|
+ data.setV(Double.parseDouble(record.getReadData()));
|
|
|
+ datas.add(data);
|
|
|
+ msg.put("real",datas);
|
|
|
+ }
|
|
|
+ else{
|
|
|
+ msg = null ;
|
|
|
+ }
|
|
|
+ return msg ;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ protected void sendMessage(ScRabbitConfig config, String message){
|
|
|
+ log.info("begin send message ,config = {},message = {}", config, message);
|
|
|
+ //设置当前线程lookupKey,内部由ThreadLocal实现
|
|
|
+ SimpleResourceHolder.bind(rabbitTemplate.getConnectionFactory(), config.getCode());
|
|
|
+ //业务操作会根据线程中的lookupKey从routeConnectionFactory的targetConnectionFactories中选择对应的connectionFactory
|
|
|
+ rabbitTemplate.convertAndSend(config.getExchange(),config.getRoutingKey(), message);
|
|
|
+ //操作完以后记得解绑。不影响线程的后序其他工厂操作
|
|
|
+ SimpleResourceHolder.unbind(rabbitTemplate.getConnectionFactory());
|
|
|
+ log.info("end send message ,config = {},message = {}", config, message);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 字符串高位补零
|
|
|
+ * @param str
|
|
|
+ * @param strLength
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ protected static String addZeroForString(String str, int strLength) {
|
|
|
+ int strLen = str.length();
|
|
|
+ if (strLen < strLength) {
|
|
|
+ while (strLen < strLength) {
|
|
|
+ StringBuffer sb = new StringBuffer();
|
|
|
+ // 左补0
|
|
|
+ sb.append("0").append(str);
|
|
|
+ str = sb.toString();
|
|
|
+ strLen = str.length();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ return str;
|
|
|
+ }
|
|
|
+
|
|
|
+}
|