package com.zcxk.meterreadingsystemv2.kafka; import com.zcxk.meterreadingsystemv2.common.JacksonUtil; import com.zcxk.meterreadingsystemv2.dao.TRemoteCustomerMapper; import com.zcxk.meterreadingsystemv2.dao.YcblhbMapper; import com.zcxk.meterreadingsystemv2.dbs.DynamicDataSourceContextHolder; import com.zcxk.meterreadingsystemv2.entity.AcceptData; import com.zcxk.meterreadingsystemv2.entity.AcceptListData; import com.zcxk.meterreadingsystemv2.entity.TRemoteCustomer; import com.zcxk.meterreadingsystemv2.entity.Ycblhb; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; import java.math.BigDecimal; /** * 接收验收立户消息类 */ @Slf4j @Component public class AddAccountReceiver { @Autowired YcblhbMapper ycblhbMapper; @Resource private TRemoteCustomerMapper tRemoteCustomerMapper; @KafkaListener(topics = {"${com.zcxk.kafka.add-account.topic}"}) public void receiveMessage(String message) { log.info("Kafka consumer Received :{}",message); try { AcceptData acceptData = JacksonUtil.string2Obj(message, AcceptData.class); String commOrgLevel = null; String commOrgIdenty = null; if (acceptData != null) { //萝北自来水公司立户 if("prd".equals(acceptData.getActive()) && acceptData.getCustomerId() == 25){ DynamicDataSourceContextHolder.setDataSourceKey("lb"); log.info("=====service当前连接的数据库是:" + DynamicDataSourceContextHolder.getDataSourceKey()); if (acceptData.getAcceptListData() != null && acceptData.getAcceptListData().size() > 0) { for (AcceptListData acceptListData : acceptData.getAcceptListData()) { Ycblhb ycblhb = new Ycblhb(); ycblhb.setYhdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setSbdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBc("001"); ycblhb.setBsm(acceptListData.getWaterMeterNo()); ycblhb.setZdsj("010101"); ycblhb.setYys("020101"); ycblhb.setBw(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBxh("4"); ycblhb.setBcj("37"); //ycblhb.setBqds(acceptListData.getReadData()); ycblhb.setBqds(BigDecimal.ZERO); ycblhb.setNhrq(acceptListData.getInstallTime()); //log.info(JacksonUtil.obj2String(ycblhb)); ycblhbMapper.insertSelective(ycblhb); } } } //绥滨自来水公司立户 else if("prd".equals(acceptData.getActive()) && acceptData.getCustomerId() == 33){ DynamicDataSourceContextHolder.setDataSourceKey("suibin"); log.info("=====service当前连接的数据库是:" + DynamicDataSourceContextHolder.getDataSourceKey()); if (acceptData.getAcceptListData() != null && acceptData.getAcceptListData().size() > 0) { for (AcceptListData acceptListData : acceptData.getAcceptListData()) { Ycblhb ycblhb = new Ycblhb(); ycblhb.setYhdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setSbdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBc("001"); ycblhb.setBsm(acceptListData.getWaterMeterNo()); ycblhb.setZdsj("010101"); ycblhb.setYys("020101"); ycblhb.setBw(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBxh("4"); ycblhb.setBcj("37"); //ycblhb.setBqds(acceptListData.getReadData()); ycblhb.setBqds(BigDecimal.ZERO); ycblhb.setNhrq(acceptListData.getInstallTime()); //log.info(JacksonUtil.obj2String(ycblhb)); ycblhbMapper.insertSelective(ycblhb); } } } //同江供水有限公司立户 else if("prd".equals(acceptData.getActive()) && acceptData.getCustomerId() == 57){ DynamicDataSourceContextHolder.setDataSourceKey("tongjiang"); log.info("=====service当前连接的数据库是:" + DynamicDataSourceContextHolder.getDataSourceKey()); commOrgLevel = "10001014"; commOrgIdenty = "10001014"; if (acceptData.getAcceptListData() != null && acceptData.getAcceptListData().size() > 0) { for (AcceptListData acceptListData : acceptData.getAcceptListData()) { Ycblhb ycblhb = new Ycblhb(); ycblhb.setYhdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setSbdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBc("001"); ycblhb.setBsm(acceptListData.getWaterMeterNo()); ycblhb.setZdsj("010101"); ycblhb.setYys("020101"); ycblhb.setBw(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBxh("4"); //ycblhb.setBqds(acceptListData.getReadData()); ycblhb.setBqds(BigDecimal.ZERO); ycblhb.setNhrq(acceptListData.getInstallTime()); ycblhb.setLdh(acceptListData.getBuildingName()); ycblhb.setDyh(acceptListData.getCommunityName()); ycblhb.setMph(acceptListData.getLocation()); ycblhb.setBcj("37"); ycblhb.setCommOrgLevel(commOrgLevel); ycblhb.setCommOrgIdenty(commOrgIdenty); //log.info(JacksonUtil.obj2String(ycblhb)); ycblhbMapper.insertSelective(ycblhb); } } } //泾阳立户 else if("prd".equals(acceptData.getActive()) && acceptData.getCustomerId() == 22){ DynamicDataSourceContextHolder.setDataSourceKey("jingyang"); log.info("=====service当前连接的数据库是:" + DynamicDataSourceContextHolder.getDataSourceKey()); commOrgLevel = "10001017"; commOrgIdenty = "10001017"; if (acceptData.getAcceptListData() != null && acceptData.getAcceptListData().size() > 0) { for (AcceptListData acceptListData : acceptData.getAcceptListData()) { Ycblhb ycblhb = new Ycblhb(); ycblhb.setYhdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setSbdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBc("001"); ycblhb.setBsm(acceptListData.getWaterMeterNo()); ycblhb.setZdsj("010101"); ycblhb.setYys("020101"); ycblhb.setBw(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBxh("4"); //ycblhb.setBqds(acceptListData.getReadData()); ycblhb.setBqds(BigDecimal.ZERO); ycblhb.setNhrq(acceptListData.getInstallTime()); ycblhb.setLdh(acceptListData.getBuildingName()); ycblhb.setDyh(acceptListData.getCommunityName()); ycblhb.setMph(acceptListData.getLocation()); ycblhb.setBcj("05"); ycblhb.setCommOrgLevel(commOrgLevel); ycblhb.setCommOrgIdenty(commOrgIdenty); //log.info(JacksonUtil.obj2String(ycblhb)); ycblhbMapper.insertSelective(ycblhb); } } } //汇川立户 else if("prd".equals(acceptData.getActive()) && acceptData.getCustomerId() == 38){ DynamicDataSourceContextHolder.setDataSourceKey("huichuan"); log.info("=====service当前连接的数据库是:" + DynamicDataSourceContextHolder.getDataSourceKey()); commOrgLevel = "10001002"; commOrgIdenty = "10001002"; if (acceptData.getAcceptListData() != null && acceptData.getAcceptListData().size() > 0) { for (AcceptListData acceptListData : acceptData.getAcceptListData()) { Ycblhb ycblhb = new Ycblhb(); ycblhb.setYhdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setSbdz(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBc("001"); ycblhb.setBsm(acceptListData.getWaterMeterNo()); ycblhb.setZdsj("010101"); ycblhb.setYys("020101"); ycblhb.setBw(acceptListData.getBuildingName() + acceptListData.getLocation()); ycblhb.setBxh("4"); //ycblhb.setBqds(acceptListData.getReadData()); ycblhb.setBqds(BigDecimal.ZERO); ycblhb.setNhrq(acceptListData.getInstallTime()); ycblhb.setLdh(acceptListData.getBuildingName()); ycblhb.setDyh(acceptListData.getCommunityName()); ycblhb.setMph(acceptListData.getLocation()); ycblhb.setBcj("31"); ycblhb.setCommOrgLevel(commOrgLevel); ycblhb.setCommOrgIdenty(commOrgIdenty); //log.info(JacksonUtil.obj2String(ycblhb)); ycblhbMapper.insertSelective(ycblhb); } } } //监利 else if("prd".equals(acceptData.getActive()) && acceptData.getCustomerId() == 19){ DynamicDataSourceContextHolder.setDataSourceKey("jl"); log.info("=====service当前连接的数据库是:" + DynamicDataSourceContextHolder.getDataSourceKey()); if (acceptData.getAcceptListData() != null && acceptData.getAcceptListData().size() > 0) { for (AcceptListData acceptListData : acceptData.getAcceptListData()) { TRemoteCustomer tRemoteCustomerOld = tRemoteCustomerMapper.findByMeteraddr(acceptListData.getWaterMeterNo()); if (tRemoteCustomerOld != null) { log.info(acceptListData.getWaterMeterNo()+"已经存在"); continue; } TRemoteCustomer tRemoteCustomer = new TRemoteCustomer(); tRemoteCustomer.setFactoryid(5); tRemoteCustomer.setMeteraddr(acceptListData.getWaterMeterNo()); tRemoteCustomer.setUsername(acceptListData.getUsername()); tRemoteCustomer.setLinkman(acceptListData.getUsername()); tRemoteCustomer.setPhone(acceptListData.getUserPhone()); tRemoteCustomer.setPaperno(acceptListData.getIdCard()); tRemoteCustomer.setAddress(acceptListData.getLocation()); //tRemoteCustomer.setCaliber(); 口径 tRemoteCustomer.setInstalldate(acceptListData.getInstallTime()); tRemoteCustomer.setIfctrlvalve(0);//是否阀控表 tRemoteCustomer.setImported(0); //log.info(JSON.toJSONString(tRemoteCustomer)); //tRemoteCustomer.setOldmeterid(); tRemoteCustomer.setUsercode(acceptListData.getFileNo());//客户编码 //tRemoteCustomer.setExtenddata1(); //tRemoteCustomer.setExtenddata2(); //tRemoteCustomer.setExtenddata3(); //tRemoteCustomer.setOldmeteraddr(); //tRemoteCustomer.setValveflag(); //tRemoteCustomer.setId(); tRemoteCustomerMapper.insertSelective(tRemoteCustomer); } } }else { return; } } }catch (Exception e){ log.info("Kafka consumer Received error={},{}",e.getMessage(),e); e.printStackTrace(); } } }