lihui001 3 лет назад
Родитель
Сommit
95008e5340

+ 19 - 0
zoniot-pay/zoniot-pay-core/src/main/java/com/zcxk/rmcp/pay/dao/CommunityMapper.java

@@ -40,4 +40,23 @@ public interface CommunityMapper {
     Community findByNameV2(@Param("siteId") Integer siteId, @Param("province") Integer province, @Param("city") Integer city, @Param("region") Integer region, @Param("communityName") String communityName);
 
     Community findByName(@Param("siteId") Integer siteId, @Param("communityName") String communityName);
+
+
+    /**
+    * 如果id存在,则删除再插入,如果不存在则插入
+    * @author Andy
+    * @date 10:17 2021/9/7
+    * @param community:
+    * @return void
+    **/
+    void replaceInsertSelective(@Param("community")Community community);
+
+    /**
+    * 物理删除
+    * @author Andy
+    * @date 10:22 2021/9/7
+    * @param id:
+    * @return void
+    **/
+    void deleteById(int id);
 }

+ 3 - 0
zoniot-pay/zoniot-pay-core/src/main/java/com/zcxk/rmcp/pay/entity/Community.java

@@ -25,6 +25,9 @@ public class Community implements Serializable {
     @ApiModelProperty(value="客户id", required = true)
     private Integer customerId;
 
+    @ApiModelProperty(value="租户ID", required = true)
+    private String tenantId;
+
     @ApiModelProperty(value="省", required = true)
     private Integer province;
 

+ 65 - 0
zoniot-pay/zoniot-pay-core/src/main/resources/mapper/CommunityMapper.xml

@@ -109,6 +109,67 @@
         )
     </insert>
 
+    <insert id="replaceInsertSelective" useGeneratedKeys="true" keyProperty="community.id">
+        REPLACE INTO sc_community
+        <trim prefix="(" suffix=")" suffixOverrides=",">
+            <if test="community.id!=null"> id,</if>
+            <if test="community.siteId!=null"> site_id,</if>
+            <if test="community.name!=null"> `name`,</if>
+            <if test="community.code!=null"> code,</if>
+            <if test="community.tenantId!=null"> tenant_id,</if>
+            <if test="community.province!=null"> province,</if>
+            <if test="community.city!=null"> city,</if>
+            <if test="community.region!=null"> region,</if>
+            <if test="community.longitude!=null"> longitude,</if>
+            <if test="community.latitude!=null"> latitude,</if>
+            <if test="community.districtId!=null"> district_id,</if>
+            <if test="community.address!=null"> address,</if>
+            <if test="community.remark!=null"> remark,</if>
+            <if test="community.status!=null"> `status`,</if>
+            <if test="community.dateCreate!=null"> date_create,</if>
+            <if test="community.dateUpdate!=null"> date_update,</if>
+            <if test="community.createBy!=null"> create_by,</if>
+            <if test="community.updateBy!=null"> update_by,</if>
+        </trim>
+        VALUES
+        <trim prefix="(" suffix=")" suffixOverrides=",">
+            <if test="community.id!=null">#{community.id,jdbcType=INTEGER},
+            </if>
+            <if test="community.siteId!=null">#{community.siteId,jdbcType=INTEGER},
+            </if>
+            <if test="community.name!=null">#{community.name,jdbcType=VARCHAR},
+            </if>
+            <if test="community.code!=null">#{community.code,jdbcType=VARCHAR},
+            </if>
+            <if test="community.tenantId!=null">#{community.tenantId,jdbcType=INTEGER},
+            </if>
+            <if test="community.province!=null">#{community.province,jdbcType=INTEGER},
+            </if>
+            <if test="community.city!=null">#{community.city,jdbcType=INTEGER},
+            </if>
+            <if test="community.region!=null">#{community.region,jdbcType=INTEGER},
+            </if>
+            <if test="community.longitude!=null"> #{community.longitude,jdbcType=VARCHAR},</if>
+            <if test="community.latitude!=null"> #{community.latitude,jdbcType=VARCHAR},</if>
+            <if test="community.districtId!=null">#{community.districtId,jdbcType=INTEGER},
+            </if>
+            <if test="community.address!=null">#{community.address,jdbcType=VARCHAR},
+            </if>
+            <if test="community.remark!=null">#{community.remark,jdbcType=VARCHAR},
+            </if>
+            <if test="community.status!=null">#{community.status,jdbcType=INTEGER},
+            </if>
+            <if test="community.dateCreate!=null">#{community.dateCreate,jdbcType=TIMESTAMP},
+            </if>
+            <if test="community.dateUpdate!=null">#{community.dateUpdate,jdbcType=TIMESTAMP},
+            </if>
+            <if test="community.createBy!=null">#{community.createBy,jdbcType=VARCHAR},
+            </if>
+            <if test="community.updateBy!=null">#{community.updateBy,jdbcType=VARCHAR},
+            </if>
+        </trim>
+    </insert>
+
     <!--auto generated Code-->
     <insert id="insertSelective" useGeneratedKeys="true" keyProperty="community.id">
         INSERT INTO sc_community
@@ -376,5 +437,9 @@
         <if test="siteId != null"> and site_id = #{siteId} </if>
     </select>
 
+    <delete id="deleteById">
+    DELETE FROM sc_community where ID = #{id}
+  </delete>
+
 </mapper>
 

+ 69 - 0
zoniot-pay/zoniot-pay-web/src/main/java/com/zcxk/rmcp/pay/commom/BaseSync.java

@@ -0,0 +1,69 @@
+package com.zcxk.rmcp.pay.commom;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.zcxk.core.common.exception.BusinessException;
+import com.zcxk.rmcp.pay.enums.SqlType;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Andy
+ * @version V1.0
+ * @description: base数据同步
+ * @date 2021/7/29
+ **/
+@Slf4j
+public class BaseSync<T> {
+
+    /**
+    * @author Andy
+    * @description 转换
+    * @date 10:13 2021/7/29
+    * @param msg, syncTable, tClass
+    * @return java.util.Map<java.lang.String,java.util.List<T>>
+    **/
+    public Map<String, List<T>> stringConvertMap(String msg, String syncTable, Class<T> tClass) {
+        JSONObject jsonObject = JSONObject.parseObject(msg);
+        JSONArray jsonArray   = jsonObject.getJSONArray("data");
+        String typeSql   = jsonObject.getString("type");
+        String tableName = jsonObject.getString("table");
+        // 检查数据是否正常
+        checkData(jsonArray, typeSql, tableName, syncTable);
+        Map<String, List<T>> result = new HashMap<>(jsonArray.size());
+        List<T> list = new ArrayList<>();
+        for (int i = 0; i < jsonArray.size(); i++) {
+            list.add(JSON.toJavaObject(jsonArray.getJSONObject(i), tClass));
+        }
+        log.info("【mq sync数据】同步的表名:{},操作类型:{},数据大小:{},同步的数据:{}", syncTable, typeSql, list.size(), jsonArray.toString());
+        result.put(typeSql, list);
+        return result;
+    }
+
+    /**
+     * @author Andy
+     * @description 检查数据是否符合
+     * @date 9:48 2021/7/29
+     * @param jsonArray, typeSql, tableName
+     * @return boolean
+     **/
+    private void checkData(JSONArray jsonArray, String typeSql, String tableName, String syncTable){
+        if (!syncTable.equals(tableName)) {
+            throw BusinessException.builder("【mq sync数据错误】表名不一致,需要同步的表【"+ syncTable +"】,当前同步过来的表:" + tableName);
+        }
+        try {
+            SqlType.valueOf(typeSql);
+        } catch (IllegalArgumentException e) {
+            throw BusinessException.builder("【mq sync数据错误】表名:"+ tableName + ",操作:【"+ typeSql +"】不属于更新操作。" );
+        }
+        if (jsonArray.size() == 0) {
+            throw BusinessException.builder("【mq sync数据错误】表名:"+ syncTable + ",无数据需要更新。");
+        }
+    }
+
+}

+ 15 - 0
zoniot-pay/zoniot-pay-web/src/main/java/com/zcxk/rmcp/pay/commom/constant/MqConstant.java

@@ -0,0 +1,15 @@
+package com.zcxk.rmcp.pay.commom.constant;
+
+/**
+ * @author Andy
+ * @version V1.0
+ * @description: mq常量
+ * @date 2021/7/28
+ **/
+public class MqConstant {
+
+    /**
+     * 同步小区队列
+     */
+    public static final String SYNC_COMMUNITY_QUEUE = "canal-t-community-queue";
+}

+ 28 - 0
zoniot-pay/zoniot-pay-web/src/main/java/com/zcxk/rmcp/pay/enums/SqlType.java

@@ -0,0 +1,28 @@
+package com.zcxk.rmcp.pay.enums;
+
+/**
+ * @author Andy
+ * @version V1.0
+ * @description: sql类型
+ * @date 2021/7/29
+ **/
+public enum SqlType {
+
+    /**
+     * 更新
+     */
+    UPDATE,
+
+    /**
+     * 插入
+     */
+    INSERT,
+
+    /**
+     * 删除
+     */
+    DELETE,
+
+    ;
+
+}

+ 110 - 0
zoniot-pay/zoniot-pay-web/src/main/java/com/zcxk/rmcp/pay/mq/SyncCommunityConsumer.java

@@ -0,0 +1,110 @@
+package com.zcxk.rmcp.pay.mq;
+
+import com.zcxk.core.common.exception.BusinessException;
+import com.zcxk.rmcp.pay.commom.BaseSync;
+import com.zcxk.rmcp.pay.commom.constant.MqConstant;
+import com.zcxk.rmcp.pay.dao.CommunityMapper;
+import com.zcxk.rmcp.pay.entity.Community;
+import com.zcxk.rmcp.pay.enums.SqlType;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.springframework.amqp.core.Message;
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Andy
+ * @version V1.0
+ * @description: 同步小区
+ * @date 2021/7/28
+ **/
+@Slf4j
+@Component
+public class SyncCommunityConsumer extends BaseSync<Community> {
+
+    @Resource
+    private CommunityMapper communityMapper;
+
+    /**
+     * 确定同步来的表名字
+     */
+    private static final String TABLE_NAME = "rmcp_community";
+
+    /**
+    * @author Andy
+    * @description mq入口
+    * @date 11:33 2021/7/29
+    * @param message
+    * @return void
+    **/
+    @RabbitListener(queues = MqConstant.SYNC_COMMUNITY_QUEUE)
+    @RabbitHandler
+    public void exec(Message message) {
+        log.info("======================同步小区数据 begin======================");
+        String msg = new String(message.getBody(), StandardCharsets.UTF_8);
+        if (StringUtils.isEmpty(msg)) {
+            log.info("同步小区数据 消费为空");
+            return;
+        }
+        try {
+            exec(msg);
+        } catch (Exception e) {
+            log.error("同步小区数据异常:{}", msg);
+        }
+        log.info("======================同步小区数据  end=======================");
+    }
+
+    /**
+    * @author Andy
+    * @description 执行入口
+    * @date 11:33 2021/7/29
+    * @param msg
+    * @return void
+    **/
+    private void exec(String msg) throws Exception {
+        try {
+            Map<String, List<Community>> mmp = super.stringConvertMap(msg, TABLE_NAME, Community.class);
+            mmp.forEach((key, value) -> {
+                log.info("同步小区数据{}操作,需要执行:{}条", key, value.size());
+                tableOperation(key, value);
+                log.info("同步小区数据{}操作,执行成功:{}条", key, value.size());
+            });
+        } catch (BusinessException e) {
+            log.error(e.getMsg());
+        }
+    }
+
+    /**
+    * @author Andy
+    * @description 执行表操作
+    * @date 10:19 2021/7/29
+    * @param operationType, communitys
+    * @return void
+    **/
+    private void tableOperation (String operationType, List<Community> list){
+        switch (SqlType.valueOf(operationType)) {
+            case INSERT:
+            case UPDATE:
+                for (Community community : list) {
+                    communityMapper.replaceInsertSelective(community);
+                    log.info("同步小区数据,"+ operationType +"操作,操作数据:{}", community.toString());
+                }
+                break;
+            case DELETE:
+                for (Community community : list) {
+                    communityMapper.deleteById(community.getId());
+                    log.info("同步小区数据,DELETE操作,操作数据:Id:{}", community.getId());
+                }
+                break;
+            default:
+                return;
+        }
+    }
+
+}