ReportWaterPumpStateHandler.java 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195
  1. package com.huaxu.rabbitmq;
  2. import cn.hutool.core.collection.CollectionUtil;
  3. import com.alibaba.fastjson.JSONObject;
  4. import com.huaxu.dao.DeviceAttributeSpecsMapper;
  5. import com.huaxu.dao.ReportWaterPumpStateMapper;
  6. import com.huaxu.dto.AlarmDetailsDto;
  7. import com.huaxu.dto.watePump.ReprotWaterPumpQueryDto;
  8. import com.huaxu.entity.DeviceAttributeSpecsEntity;
  9. import com.huaxu.entity.MonitorDataEntity;
  10. import com.huaxu.entity.MonitorDataValueEntity;
  11. import com.huaxu.entity.ReportWaterPumpStateEntity;
  12. import com.huaxu.util.DatesUtil;
  13. import com.huaxu.util.RedisUtil;
  14. import io.swagger.models.auth.In;
  15. import lombok.extern.slf4j.Slf4j;
  16. import org.springframework.beans.factory.annotation.Autowired;
  17. import org.springframework.scheduling.annotation.Async;
  18. import org.springframework.stereotype.Component;
  19. import java.math.BigDecimal;
  20. import java.net.InetAddress;
  21. import java.net.UnknownHostException;
  22. import java.util.*;
  23. import java.util.stream.Collectors;
  24. /**
  25. * @ClassName ReportWaterPumpStateHandler
  26. * @Description: 水泵运行状态报表业务处理
  27. * 清洗数据上报,条件:属性参数类型为水泵状态(2)
  28. * 清洗规则:以分钟为单位计算水泵是否运行,如水泵从00:23:22开始运行, 到01:32:22是停止,
  29. * 则认为00:23-01:32分为水泵运行状态;
  30. * 如果数据存在跨天,需要重新新增一条新的数据
  31. * @Author lihui
  32. * @Date 2021/4/20
  33. * @Version V1.0
  34. **/
  35. @Component
  36. @Slf4j
  37. public class ReportWaterPumpStateHandler {
  38. private static String IP;
  39. @Autowired
  40. private ReportWaterPumpStateMapper reportWaterPumpStateMapper;
  41. @Autowired
  42. private DeviceAttributeSpecsMapper deviceAttributeSpecsMapper;
  43. @Async
  44. public void handler(MonitorDataEntity monitorDataEntity, JSONObject receiveData, Date receiveDateTime){
  45. log.info("【水泵运行状态报表:{}】开始处理", monitorDataEntity.getDeviceCode());
  46. // 0 运行, 1停止
  47. int state = 0;
  48. // 执行批量插入操作
  49. DeviceAttributeSpecsEntity entity = new DeviceAttributeSpecsEntity();
  50. List<ReportWaterPumpStateEntity> insertList = new ArrayList<>();
  51. ReprotWaterPumpQueryDto queryDto = getReprotWaterPumpQueryDto(monitorDataEntity);
  52. // 转换常量,保证获取是同一个对象
  53. // 避免同一时间内同一个code上报多条数据,保证数据更新和插入一致性
  54. // TODO 需要采用分布式锁解决,避免部署多台服务器
  55. synchronized (monitorDataEntity.getDeviceCode().intern()) {
  56. for (MonitorDataValueEntity valueEntity : monitorDataEntity.getDataValues()) {
  57. if (!receiveData.containsKey(valueEntity.getIdentifier())){
  58. continue;
  59. }
  60. entity.setAttributeId(valueEntity.getAttributeId());
  61. // 查找属性ID是否有设置运行、停止状态
  62. List<DeviceAttributeSpecsEntity> list = deviceAttributeSpecsMapper.findList(entity);
  63. if (CollectionUtil.isEmpty(list)) {
  64. continue;
  65. }
  66. List<String> listSate = toList(list);
  67. if (CollectionUtil.isEmpty(listSate)) {
  68. continue;
  69. }
  70. BigDecimal stateValue = new BigDecimal(receiveData.getDouble(valueEntity.getIdentifier()));
  71. state = listSate.contains(stateValue.toString()) ? 0 : 1;
  72. queryDto.setAttributeId(valueEntity.getAttributeId());
  73. addOrUpdate(queryDto.getMd5Query(), monitorDataEntity, valueEntity, insertList, state, receiveDateTime);
  74. }
  75. // 最后一次全部插入
  76. if (insertList.size() > 0 ){
  77. reportWaterPumpStateMapper.batchInsertReportWaterPumpState(insertList);
  78. }
  79. }
  80. log.info("【水泵运行状态报表:{}】结束处理,新增:{}", monitorDataEntity.getDeviceCode(),insertList.size());
  81. }
  82. private List<String> toList(List<DeviceAttributeSpecsEntity> list){
  83. List<String> result = new ArrayList<>();
  84. for (DeviceAttributeSpecsEntity deviceAttributeSpecsEntity: list) {
  85. if (deviceAttributeSpecsEntity.getSpecsName() != null &&
  86. deviceAttributeSpecsEntity.getSpecsName().indexOf("运行") != -1) {
  87. result.add(deviceAttributeSpecsEntity.getSpecsValue());
  88. }
  89. }
  90. return result;
  91. }
  92. /**
  93. * @Author lihui
  94. * @Description 如果查询到数据,根据返回的state来判断数据
  95. * 如果返回的state和上报的state同一个状态,说明该状态一直在持续,我们不需要做任何的操作
  96. * 如果返回的state和上报的state不是同一个状态,说明state已经改变,我们需要更新上一次state所持续的最后时间,并新增一条目前的state的开始时间
  97. * 公式:同一id-> 00:23-00:28 运行时间
  98. * -> 00:28-00:30 停止时间
  99. * -> 00:30-null 运行时间
  100. * @Date 14:08 2021/4/21
  101. * @Param [md5Query :查询条件, monitorDataEntity, valueEntity, inserList :需要插入的数据集合, stateValue:泵站状态,receiveDateTime:上报数据的时间]
  102. * @return
  103. **/
  104. private void addOrUpdate(String md5Query , MonitorDataEntity monitorDataEntity,
  105. MonitorDataValueEntity valueEntity ,List<ReportWaterPumpStateEntity> inserList,
  106. int stateValue, Date receiveDateTime){
  107. ReportWaterPumpStateEntity stateEntity = reportWaterPumpStateMapper.findReportWaterPumpState(md5Query);
  108. if (stateEntity == null) {
  109. inserList.add(getReportWaterPumpStateEntity(monitorDataEntity, valueEntity, receiveDateTime, stateValue));
  110. return;
  111. }
  112. Date beginTime = null;
  113. // 计算出时间差 等于0表示同一天的数据,大于0表示跨天
  114. int day = timeDifference(stateEntity.getStateBeginTime(), receiveDateTime);
  115. boolean theSameSate = stateEntity.getState().intValue() == stateValue;
  116. // 跨天需要新增一条数据,如果是跨一天的数据起始时间为 receiveDateTime + 00:00:00, 跨多天就按当前上报的时间
  117. if (day != 0) {
  118. beginTime = day == 1 ? parseDate(receiveDateTime, "00:00:00") : receiveDateTime;
  119. reportWaterPumpStateMapper.updateReportWaterPumpState(stateEntity.getId(), parseDate(stateEntity.getStateBeginTime(), "23:59:59"), new Date());
  120. inserList.add(getReportWaterPumpStateEntity(monitorDataEntity, valueEntity, beginTime, stateValue));
  121. return;
  122. }
  123. // 同一个状态的数据
  124. if (theSameSate) {
  125. return;
  126. }
  127. reportWaterPumpStateMapper.updateReportWaterPumpState(stateEntity.getId(), receiveDateTime, new Date());
  128. inserList.add(getReportWaterPumpStateEntity(monitorDataEntity, valueEntity, receiveDateTime, stateValue));
  129. }
  130. private ReprotWaterPumpQueryDto getReprotWaterPumpQueryDto(MonitorDataEntity monitorDataEntity){
  131. ReprotWaterPumpQueryDto queryDto = new ReprotWaterPumpQueryDto();
  132. queryDto.setYear(monitorDataEntity.getYear());
  133. queryDto.setMonth(monitorDataEntity.getMonth());
  134. queryDto.setDay(monitorDataEntity.getDay());
  135. queryDto.setTenantId(monitorDataEntity.getTenantId());
  136. queryDto.setDeviceId(monitorDataEntity.getDeviceId());
  137. queryDto.setDeviceCode(monitorDataEntity.getDeviceCode());
  138. return queryDto;
  139. }
  140. private ReportWaterPumpStateEntity getReportWaterPumpStateEntity(MonitorDataEntity monitorDataEntity, MonitorDataValueEntity monitorDataValueEntity,
  141. Date receiveDateTime, Integer state){
  142. Date nowTime = new Date();
  143. ReportWaterPumpStateEntity entity = new ReportWaterPumpStateEntity();
  144. entity.setState(state);
  145. entity.setTenantId(monitorDataEntity.getTenantId());
  146. entity.setDeviceId(monitorDataEntity.getDeviceId());
  147. entity.setDeviceCode(monitorDataEntity.getDeviceCode());
  148. entity.setDeviceName(monitorDataEntity.getDeviceName());
  149. entity.setYear(monitorDataEntity.getYear());
  150. entity.setMonth(monitorDataEntity.getMonth());
  151. entity.setDay(monitorDataEntity.getDay());
  152. entity.setAttributeId(monitorDataValueEntity.getAttributeId());
  153. entity.setAttributeName(monitorDataValueEntity.getAttributeName());
  154. entity.setStateBeginTime(receiveDateTime);
  155. entity.setDateCreate(nowTime);
  156. entity.setDateUpdate(nowTime);
  157. entity.setMd5Query(entity.getMd5());
  158. entity.setIp(getIp());
  159. return entity;
  160. }
  161. private int timeDifference(Date beginTime, Date endTime){
  162. return Integer.parseInt(DatesUtil.formatDate(endTime, "yyyyMMdd")) - Integer.parseInt(DatesUtil.formatDate(beginTime, "yyyyMMdd"));
  163. }
  164. private Date parseDate(Date date, String appendTo){
  165. return DatesUtil.parseDate(DatesUtil.formatDate(date, "yyyy-MM-dd") + " " + appendTo,
  166. "yyyy-MM-dd HH:mm:ss");
  167. }
  168. public String getIp(){
  169. try {
  170. if (IP == null) {
  171. IP = InetAddress.getLocalHost().getHostAddress();
  172. }
  173. } catch (UnknownHostException e) {
  174. e.printStackTrace();
  175. }
  176. return IP;
  177. }
  178. }