| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332 |
- package com.huaxu.zoniot.service.impl;
- import cn.hutool.core.date.DateUtil;
- import com.alibaba.fastjson.JSON;
- import com.alibaba.fastjson.JSONArray;
- import com.alibaba.fastjson.JSONObject;
- import com.huaxu.zoniot.dao.*;
- import com.huaxu.zoniot.model.DeviceData;
- import com.huaxu.zoniot.utils.HttpClientUtils;
- import com.huaxu.zoniot.entity.DeviceInfoPushConfig;
- import com.huaxu.zoniot.entity.MeterInfo;
- import com.huaxu.zoniot.entity.PushLog;
- import com.huaxu.zoniot.service.MeterFileSyncService;
- import com.huaxu.zoniot.utils.Sha256Util;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.commons.collections.MapUtils;
- import org.apache.commons.lang3.StringUtils;
- import org.springframework.amqp.rabbit.core.RabbitTemplate;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.beans.factory.annotation.Value;
- import org.springframework.stereotype.Service;
- import javax.annotation.Resource;
- import java.nio.charset.Charset;
- import java.util.*;
- /**
- * <p></p>
- *
- * @Author wilian.peng
- * @Date 2021/1/27 16:28
- * @Version 1.0
- */
- @Slf4j
- @Service
- public class MeterFileSyncServiceImpl implements MeterFileSyncService {
- @Autowired
- DeviceInfoPushConfigMapper deviceInfoPushConfigMapper ;
- @Autowired
- WaterMeterMapper waterMeterMapper;
- @Autowired
- PushLogMapper pushLogMapper;
- @Autowired
- MeterInfoMapper meterInfoMapper;
- @Autowired
- CustomerMapper customerMapper;
- @Resource
- private RabbitTemplate rabbitTemplate;
- @Value("${spring.rabbitmq.exchange}")
- private String exchange;
- @Value("${spring.rabbitmq.device.queue}")
- private String queue;
- @Override
- public int httpSync(Integer configId) {
- int total = 0 ;
- DeviceInfoPushConfig config = deviceInfoPushConfigMapper.findConfigById(configId);
- if(config != null ){
- // 1,查询推送记录,1档案;2数据
- PushLog lastLog = pushLogMapper.findLastRecord(config.getId(), 1);
- // 2,计算推送数据时间;如果为首次推送,则判断是否需要将历史数据推送给对方;
- Date beginDate = null ;
- Date endDate = new Date();
- Integer status = null ;
- if(lastLog != null) {
- beginDate = lastLog.getPushTime() ;
- }
- else {
- // 首次推送,只推送有效的设备
- status = 1 ;
- beginDate = DateUtil.parse("1900-01-01", "yyyy-MM-dd");
- }
- // 3,根据推送数据时间进行数据查询
- String pushCustomers = config.getPushCustomers();
- String pushCommunitys = config.getPushCommunitys();
- String pushBuildings = config.getPushBuildings();
- String pushChannels = config.getPushChannels();
- String pushDeviceTypes = config.getPushDeviceTypes();
- log.info("begin query push device list , push config = {}, beginDate={}, endDate={}",
- JSON.toJSONString(config), beginDate, endDate);
- List<Map<String, Object>> dataList = waterMeterMapper.getPushMeterList(pushCustomers,
- pushCommunitys,
- pushBuildings,
- pushChannels,
- pushDeviceTypes,
- status,
- beginDate,
- endDate);
- total = dataList.size();
- log.info("end query push device list , size = {}",total);
- if(dataList.size() != 0) {
- sendBatchData(dataList,beginDate,endDate,config);
- }
- }
- return total;
- }
- @Override
- public int fuJinSync(Map<String, Object> param) {
- Integer customerId = MapUtils.getInteger(param,"customerId");
- String getUrl = MapUtils.getString(param,"getUrl");
- int result = 0 ;
- // 1,查询是否是第一次进行同步数据
- Integer count = meterInfoMapper.countByCustomer(customerId);
- // 2,如果是第一次进行数据同步,则建档时间定再1900,进行第一次全量同步,如果不是第一次进行数据同步,则进行增量同步
- Date fromDate ;
- Date toDate = new Date();
- if(count == 0) {
- fromDate = DateUtil.parse("1900-01-01", "yyyy-MM-dd");
- }
- else {
- fromDate = DateUtil.offsetDay(DateUtil.date(),-1) ;
- }
- // 3,构建请求JSON,并进行请求
- String jsonReq = buildJSONRequestContent(fromDate,toDate);
- String resp = null ;
- try {
- log.info("begin http request , url = {} , content = {}",getUrl,jsonReq);
- resp = HttpClientUtils.doPostWithJson(getUrl, jsonReq);
- log.info("end http request , url = {} , response = {}",getUrl,resp);
- } catch (Exception e) {
- e.printStackTrace();
- log.error("同步[富锦]水表档案信息异常", e);
- }
- // 4,将返回的数据保存到数据库
- if(StringUtils.isNotBlank(resp)) {
- result = processResponseContent(customerId,resp);
- }
- return result;
- }
- /**
- * {
- * "result":"SUCCESS",
- * "msg":"操作成功。",
- * "packet":[
- * {
- * "areaName":"演示小区(晨晖)",
- * "userAddr":"1-1-111",
- * "userNumber":"000045612345",
- * "userName":"测试",
- * "PhoneNumber":"18866664444",
- * "archiveTime":"2019-10-30 15:59:06.0",
- * "meterNumber":"15409907290001"
- * },{},...
- * ]
- * }
- * @param response
- */
- protected int processResponseContent(Integer customerId , String response) {
- int count = 0 ;
- Map<String,Object> respMap = JSON.parseObject(response, Map.class);
- String result = MapUtils.getString(respMap, "result");
- if("SUCCESS".equals(result)) {
- JSONArray infoList = (JSONArray)respMap.get("packet");
- if(infoList.size() != 0) {
- List<MeterInfo> meterInfoList = new ArrayList<MeterInfo>();
- for(int i = 0 ; i < infoList.size() ; i++) {
- JSONObject obj = (JSONObject)infoList.get(i);
- MeterInfo m = new MeterInfo();
- m.setCustomerId(customerId);
- m.setMeterNo(obj.getString("meterNumber"));
- m.setAreaName(obj.getString("areaName"));
- m.setUserAddress(obj.getString("userAddr"));
- m.setUserName(obj.getString("userName"));
- m.setUserNo(obj.getString("userNumber"));
- m.setUserPhone(obj.getString("PhoneNumber"));
- m.setCaliber(obj.getString("caliber"));
- m.setFactory(obj.getString("mfactory"));
- m.setArchiveTime(DateUtil.parse(obj.getString("archiveTime"), "yyyy-MM-dd HH:mm:ss.SSS"));
- m.setDateCreate(new Date());
- meterInfoList.add(m);
- }
- if(meterInfoList.size() != 0) {
- count = meterInfoList.size();
- meterInfoMapper.insertList(meterInfoList);
- }
- }
- }
- return count;
- }
- /**
- * <p>
- * {
- * "packet": {
- * "customer":"CESHI",
- * "systemCode": "cx.100.100",
- * "fromDate": "2018-05-01",
- * "toDate": "2018-05-10",
- * "meterType":"wlwsb" --表具类型wlwsb-物联网水表 yxsb-有线水表 ,wxsb-无线水表 yjdyhdb-液晶多用户电表 dxkb-单项卡式电表
- * },
- * "checkSum": "XXXXXYYYYY" -- SHA256对packet内容(包含{})计算密文
- * }
- * </p>
- * @return
- */
- protected String buildJSONRequestContent(Date fromDate ,Date toDate) {
- Map<String,Object> reqMap = new HashMap<String,Object>();
- Map<String,Object> packetMap = new HashMap<String,Object>();
- packetMap.put("customer", "FUJIN");
- packetMap.put("systemCode", "cx.100.100");
- packetMap.put("fromDate", DateUtil.format(fromDate, "yyyy-MM-dd"));
- packetMap.put("toDate", DateUtil.format(toDate, "yyyy-MM-dd"));
- packetMap.put("meterType", "wlwsb");
- String packetStr = JSON.toJSONString(packetMap);
- String checkSum = generateCheckSumWithSHA256(packetStr);
- reqMap.put("packet", packetMap);
- reqMap.put("checkSum", checkSum);
- return JSON.toJSONString(reqMap);
- }
- protected String generateCheckSumWithSHA256(String src) {
- String dest = Sha256Util.getSHA256(src);
- return dest ;
- }
- protected void sendBatchData(List<Map<String,Object>> dataMapList ,Date beginDate ,Date endDate , DeviceInfoPushConfig config) {
- // 1,定义push日志
- PushLog pushLog = new PushLog();
- pushLog.setDataSize(dataMapList.size());
- pushLog.setPushConfigId(config.getId());
- pushLog.setDateCreate(new Date());
- pushLog.setPushTime(endDate);
- pushLog.setPushContent(1);
- try {
- // 2,发送数据并解析返回
- String jsonContent = buildJSONContent(dataMapList, beginDate, endDate);
- String response = callPushUrl(config.getPushUrl() ,jsonContent);
- pushLog.setResponse(response);
- boolean isSuccess = parseResponse(response);
- pushLog.setPushStatus(isSuccess ? 1 : 0);
- }catch (Exception e) {
- e.printStackTrace();
- log.error("meter file push failed !");
- //推送失败
- pushLog.setPushStatus(0);
- pushLog.setRemark(e.getMessage().length() > 512 ? e.getMessage().substring(0, 512) : e.getMessage());
- }finally {
- // 3,保存推送日志
- pushLogMapper.insert(pushLog);
- }
- }
- protected String buildJSONContent(List<Map<String,Object>> dataMapList ,Date beginDate ,Date endDate) {
- Map<String ,Object> jsonMap = new HashMap<>();
- jsonMap.put("startDate", DateUtil.format(beginDate, "yyyy-MM-dd HH:mm:ss"));
- jsonMap.put("endDate", DateUtil.format(endDate, "yyyy-MM-dd HH:mm:ss"));
- jsonMap.put("count", dataMapList.size());
- jsonMap.put("dataList",dataMapList);
- String jsonString = JSON.toJSONString(jsonMap);
- return jsonString;
- }
- protected String callPushUrl(String pushUrl,String jsonContent) throws Exception {
- String response ;
- response = HttpClientUtils.doPostWithJson(pushUrl, jsonContent);
- return response ;
- }
- protected boolean parseResponse(String response) {
- boolean isSuccess = false ;
- Map rspObj = JSON.parseObject(response, Map.class);
- Integer code = MapUtils.getInteger(rspObj, "code");
- if(code == 0 ) {isSuccess = true ;}
- return isSuccess;
- }
- @Override
- public int rabbitSync() {
- List<Integer> customerIds = new ArrayList<>();
- customerIds.add(127);
- if (customerIds != null && customerIds.size() > 0) {
- for (Integer customerId : customerIds) {
- // 1,查询推送记录,1档案;2数据
- PushLog lastLog = pushLogMapper.findLastRecordBycustomerId(customerId,3);
- // 2,计算推送数据时间;如果为首次推送,则判断是否需要将历史数据推送给对方;
- Date beginDate = null ;
- Date endDate = new Date();
- Integer status = null ;
- if(lastLog != null) {
- beginDate = lastLog.getPushTime() ;
- }
- else {
- // 首次推送,只推送有效的设备
- status = 1 ;
- beginDate = DateUtil.parse("1900-01-01", "yyyy-MM-dd");
- }
- // 3,根据推送数据时间进行数据查询
- List<Integer> cIds = customerMapper.getSubId(customerId);
- String pushCustomers = StringUtils.join(cIds, ",");
- String pushChannels = "40,55";
- log.info("begin query push device list , beginDate={}, endDate={}",
- beginDate, endDate);
- List<DeviceData> dataList = waterMeterMapper.getPushMeterListV2(pushCustomers,
- pushChannels,
- status,
- beginDate,
- endDate);
- log.info("end query push device list , size = {}",dataList.size());
- if(dataList.size() != 0) {
- sendRabbitData(dataList);
- }
- // 1,定义push日志
- PushLog pushLog = new PushLog();
- pushLog.setDataSize(dataList.size());
- pushLog.setCustomerId(customerId);
- pushLog.setDateCreate(new Date());
- pushLog.setPushTime(endDate);
- pushLog.setPushContent(3);
- pushLogMapper.insertSelective(pushLog);
- }
- }
- return 0;
- }
- private void sendRabbitData(List<DeviceData> dataList) {
- for (DeviceData data : dataList) {
- String msg = JSON.toJSONString(data);
- log.info("transfer mq msg:{}",msg);
- rabbitTemplate.convertAndSend(exchange, queue,msg.getBytes(Charset.forName("UTF-8")));
- }
- }
- }
|