MeterFileSyncServiceImpl.java 13 KB


  1. package com.huaxu.zoniot.service.impl;
  2. import cn.hutool.core.date.DateUtil;
  3. import com.alibaba.fastjson.JSON;
  4. import com.alibaba.fastjson.JSONArray;
  5. import com.alibaba.fastjson.JSONObject;
  6. import com.huaxu.zoniot.dao.*;
  7. import com.huaxu.zoniot.model.DeviceData;
  8. import com.huaxu.zoniot.utils.HttpClientUtils;
  9. import com.huaxu.zoniot.entity.DeviceInfoPushConfig;
  10. import com.huaxu.zoniot.entity.MeterInfo;
  11. import com.huaxu.zoniot.entity.PushLog;
  12. import com.huaxu.zoniot.service.MeterFileSyncService;
  13. import com.huaxu.zoniot.utils.Sha256Util;
  14. import lombok.extern.slf4j.Slf4j;
  15. import org.apache.commons.collections.MapUtils;
  16. import org.apache.commons.lang3.StringUtils;
  17. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  18. import org.springframework.beans.factory.annotation.Autowired;
  19. import org.springframework.beans.factory.annotation.Value;
  20. import org.springframework.stereotype.Service;
  21. import javax.annotation.Resource;
  22. import java.nio.charset.Charset;
  23. import java.util.*;
  24. /**
  25. * <p></p>
  26. *
  27. * @Author wilian.peng
  28. * @Date 2021/1/27 16:28
  29. * @Version 1.0
  30. */
  31. @Slf4j
  32. @Service
  33. public class MeterFileSyncServiceImpl implements MeterFileSyncService {
  34. @Autowired
  35. DeviceInfoPushConfigMapper deviceInfoPushConfigMapper ;
  36. @Autowired
  37. WaterMeterMapper waterMeterMapper;
  38. @Autowired
  39. PushLogMapper pushLogMapper;
  40. @Autowired
  41. MeterInfoMapper meterInfoMapper;
  42. @Autowired
  43. CustomerMapper customerMapper;
  44. @Resource
  45. private RabbitTemplate rabbitTemplate;
  46. @Value("${spring.rabbitmq.exchange}")
  47. private String exchange;
  48. @Value("${spring.rabbitmq.device.queue}")
  49. private String queue;
  50. @Override
  51. public int httpSync(Integer configId) {
  52. int total = 0 ;
  53. DeviceInfoPushConfig config = deviceInfoPushConfigMapper.findConfigById(configId);
  54. if(config != null ){
  55. // 1,查询推送记录,1档案;2数据
  56. PushLog lastLog = pushLogMapper.findLastRecord(config.getId(), 1);
  57. // 2,计算推送数据时间;如果为首次推送,则判断是否需要将历史数据推送给对方;
  58. Date beginDate = null ;
  59. Date endDate = new Date();
  60. Integer status = null ;
  61. if(lastLog != null) {
  62. beginDate = lastLog.getPushTime() ;
  63. }
  64. else {
  65. // 首次推送,只推送有效的设备
  66. status = 1 ;
  67. beginDate = DateUtil.parse("1900-01-01", "yyyy-MM-dd");
  68. }
  69. // 3,根据推送数据时间进行数据查询
  70. String pushCustomers = config.getPushCustomers();
  71. String pushCommunitys = config.getPushCommunitys();
  72. String pushBuildings = config.getPushBuildings();
  73. String pushChannels = config.getPushChannels();
  74. String pushDeviceTypes = config.getPushDeviceTypes();
  75. log.info("begin query push device list , push config = {}, beginDate={}, endDate={}",
  76. JSON.toJSONString(config), beginDate, endDate);
  77. List<Map<String, Object>> dataList = waterMeterMapper.getPushMeterList(pushCustomers,
  78. pushCommunitys,
  79. pushBuildings,
  80. pushChannels,
  81. pushDeviceTypes,
  82. status,
  83. beginDate,
  84. endDate);
  85. total = dataList.size();
  86. log.info("end query push device list , size = {}",total);
  87. if(dataList.size() != 0) {
  88. sendBatchData(dataList,beginDate,endDate,config);
  89. }
  90. }
  91. return total;
  92. }
  93. @Override
  94. public int fuJinSync(Map<String, Object> param) {
  95. Integer customerId = MapUtils.getInteger(param,"customerId");
  96. String getUrl = MapUtils.getString(param,"getUrl");
  97. int result = 0 ;
  98. // 1,查询是否是第一次进行同步数据
  99. Integer count = meterInfoMapper.countByCustomer(customerId);
  100. // 2,如果是第一次进行数据同步,则建档时间定再1900,进行第一次全量同步,如果不是第一次进行数据同步,则进行增量同步
  101. Date fromDate ;
  102. Date toDate = new Date();
  103. if(count == 0) {
  104. fromDate = DateUtil.parse("1900-01-01", "yyyy-MM-dd");
  105. }
  106. else {
  107. fromDate = DateUtil.offsetDay(DateUtil.date(),-1) ;
  108. }
  109. // 3,构建请求JSON,并进行请求
  110. String jsonReq = buildJSONRequestContent(fromDate,toDate);
  111. String resp = null ;
  112. try {
  113. log.info("begin http request , url = {} , content = {}",getUrl,jsonReq);
  114. resp = HttpClientUtils.doPostWithJson(getUrl, jsonReq);
  115. log.info("end http request , url = {} , response = {}",getUrl,resp);
  116. } catch (Exception e) {
  117. e.printStackTrace();
  118. log.error("同步[富锦]水表档案信息异常", e);
  119. }
  120. // 4,将返回的数据保存到数据库
  121. if(StringUtils.isNotBlank(resp)) {
  122. result = processResponseContent(customerId,resp);
  123. }
  124. return result;
  125. }
  126. /**
  127. * {
  128. * "result":"SUCCESS",
  129. * "msg":"操作成功。",
  130. * "packet":[
  131. * {
  132. * "areaName":"演示小区(晨晖)",
  133. * "userAddr":"1-1-111",
  134. * "userNumber":"000045612345",
  135. * "userName":"测试",
  136. * "PhoneNumber":"18866664444",
  137. * "archiveTime":"2019-10-30 15:59:06.0",
  138. * "meterNumber":"15409907290001"
  139. * },{},...
  140. * ]
  141. * }
  142. * @param response
  143. */
  144. protected int processResponseContent(Integer customerId , String response) {
  145. int count = 0 ;
  146. Map<String,Object> respMap = JSON.parseObject(response, Map.class);
  147. String result = MapUtils.getString(respMap, "result");
  148. if("SUCCESS".equals(result)) {
  149. JSONArray infoList = (JSONArray)respMap.get("packet");
  150. if(infoList.size() != 0) {
  151. List<MeterInfo> meterInfoList = new ArrayList<MeterInfo>();
  152. for(int i = 0 ; i < infoList.size() ; i++) {
  153. JSONObject obj = (JSONObject)infoList.get(i);
  154. MeterInfo m = new MeterInfo();
  155. m.setCustomerId(customerId);
  156. m.setMeterNo(obj.getString("meterNumber"));
  157. m.setAreaName(obj.getString("areaName"));
  158. m.setUserAddress(obj.getString("userAddr"));
  159. m.setUserName(obj.getString("userName"));
  160. m.setUserNo(obj.getString("userNumber"));
  161. m.setUserPhone(obj.getString("PhoneNumber"));
  162. m.setCaliber(obj.getString("caliber"));
  163. m.setFactory(obj.getString("mfactory"));
  164. m.setArchiveTime(DateUtil.parse(obj.getString("archiveTime"), "yyyy-MM-dd HH:mm:ss.SSS"));
  165. m.setDateCreate(new Date());
  166. meterInfoList.add(m);
  167. }
  168. if(meterInfoList.size() != 0) {
  169. count = meterInfoList.size();
  170. meterInfoMapper.insertList(meterInfoList);
  171. }
  172. }
  173. }
  174. return count;
  175. }
  176. /**
  177. * <p>
  178. * {
  179. * "packet": {
  180. * "customer":"CESHI",
  181. * "systemCode": "cx.100.100",
  182. * "fromDate": "2018-05-01",
  183. * "toDate": "2018-05-10",
  184. * "meterType":"wlwsb" --表具类型wlwsb-物联网水表 yxsb-有线水表 ,wxsb-无线水表 yjdyhdb-液晶多用户电表 dxkb-单项卡式电表
  185. * },
  186. * "checkSum": "XXXXXYYYYY" -- SHA256对packet内容(包含{})计算密文
  187. * }
  188. * </p>
  189. * @return
  190. */
  191. protected String buildJSONRequestContent(Date fromDate ,Date toDate) {
  192. Map<String,Object> reqMap = new HashMap<String,Object>();
  193. Map<String,Object> packetMap = new HashMap<String,Object>();
  194. packetMap.put("customer", "FUJIN");
  195. packetMap.put("systemCode", "cx.100.100");
  196. packetMap.put("fromDate", DateUtil.format(fromDate, "yyyy-MM-dd"));
  197. packetMap.put("toDate", DateUtil.format(toDate, "yyyy-MM-dd"));
  198. packetMap.put("meterType", "wlwsb");
  199. String packetStr = JSON.toJSONString(packetMap);
  200. String checkSum = generateCheckSumWithSHA256(packetStr);
  201. reqMap.put("packet", packetMap);
  202. reqMap.put("checkSum", checkSum);
  203. return JSON.toJSONString(reqMap);
  204. }
  205. protected String generateCheckSumWithSHA256(String src) {
  206. String dest = Sha256Util.getSHA256(src);
  207. return dest ;
  208. }
  209. protected void sendBatchData(List<Map<String,Object>> dataMapList ,Date beginDate ,Date endDate , DeviceInfoPushConfig config) {
  210. // 1,定义push日志
  211. PushLog pushLog = new PushLog();
  212. pushLog.setDataSize(dataMapList.size());
  213. pushLog.setPushConfigId(config.getId());
  214. pushLog.setDateCreate(new Date());
  215. pushLog.setPushTime(endDate);
  216. pushLog.setPushContent(1);
  217. try {
  218. // 2,发送数据并解析返回
  219. String jsonContent = buildJSONContent(dataMapList, beginDate, endDate);
  220. String response = callPushUrl(config.getPushUrl() ,jsonContent);
  221. pushLog.setResponse(response);
  222. boolean isSuccess = parseResponse(response);
  223. pushLog.setPushStatus(isSuccess ? 1 : 0);
  224. }catch (Exception e) {
  225. e.printStackTrace();
  226. log.error("meter file push failed !");
  227. //推送失败
  228. pushLog.setPushStatus(0);
  229. pushLog.setRemark(e.getMessage().length() > 512 ? e.getMessage().substring(0, 512) : e.getMessage());
  230. }finally {
  231. // 3,保存推送日志
  232. pushLogMapper.insert(pushLog);
  233. }
  234. }
  235. protected String buildJSONContent(List<Map<String,Object>> dataMapList ,Date beginDate ,Date endDate) {
  236. Map<String ,Object> jsonMap = new HashMap<>();
  237. jsonMap.put("startDate", DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"));
  238. jsonMap.put("endDate", DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"));
  239. jsonMap.put("count", dataMapList.size());
  240. jsonMap.put("dataList",dataMapList);
  241. String jsonString = JSON.toJSONString(jsonMap);
  242. return jsonString;
  243. }
  244. protected String callPushUrl(String pushUrl,String jsonContent) throws Exception {
  245. String response ;
  246. response = HttpClientUtils.doPostWithJson(pushUrl, jsonContent);
  247. return response ;
  248. }
  249. protected boolean parseResponse(String response) {
  250. boolean isSuccess = false ;
  251. Map rspObj = JSON.parseObject(response, Map.class);
  252. Integer code = MapUtils.getInteger(rspObj, "code");
  253. if(code == 0 ) {isSuccess = true ;}
  254. return isSuccess;
  255. }
  256. @Override
  257. public int rabbitSync() {
  258. List<Integer> customerIds = new ArrayList<>();
  259. customerIds.add(127);
  260. if (customerIds != null && customerIds.size() > 0) {
  261. for (Integer customerId : customerIds) {
  262. // 1,查询推送记录,1档案;2数据
  263. PushLog lastLog = pushLogMapper.findLastRecordBycustomerId(customerId,3);
  264. // 2,计算推送数据时间;如果为首次推送,则判断是否需要将历史数据推送给对方;
  265. Date beginDate = null ;
  266. Date endDate = new Date();
  267. Integer status = null ;
  268. if(lastLog != null) {
  269. beginDate = lastLog.getPushTime() ;
  270. }
  271. else {
  272. // 首次推送,只推送有效的设备
  273. status = 1 ;
  274. beginDate = DateUtil.parse("1900-01-01", "yyyy-MM-dd");
  275. }
  276. // 3,根据推送数据时间进行数据查询
  277. List<Integer> cIds = customerMapper.getSubId(customerId);
  278. String pushCustomers = StringUtils.join(cIds, ",");
  279. String pushChannels = "40,55";
  280. log.info("begin query push device list , beginDate={}, endDate={}",
  281. beginDate, endDate);
  282. List<DeviceData> dataList = waterMeterMapper.getPushMeterListV2(pushCustomers,
  283. pushChannels,
  284. status,
  285. beginDate,
  286. endDate);
  287. log.info("end query push device list , size = {}",dataList.size());
  288. if(dataList.size() != 0) {
  289. sendRabbitData(dataList);
  290. }
  291. // 1,定义push日志
  292. PushLog pushLog = new PushLog();
  293. pushLog.setDataSize(dataList.size());
  294. pushLog.setCustomerId(customerId);
  295. pushLog.setDateCreate(new Date());
  296. pushLog.setPushTime(endDate);
  297. pushLog.setPushContent(3);
  298. pushLogMapper.insertSelective(pushLog);
  299. }
  300. }
  301. return 0;
  302. }
  303. private void sendRabbitData(List<DeviceData> dataList) {
  304. for (DeviceData data : dataList) {
  305. String msg = JSON.toJSONString(data);
  306. log.info("transfer mq msg:{}",msg);
  307. rabbitTemplate.convertAndSend(exchange, queue,msg.getBytes(Charset.forName("UTF-8")));
  308. }
  309. }
  310. }