|
@@ -0,0 +1,112 @@
|
|
|
+package com.zcxk.rmcp.pay.mq;
|
|
|
+
|
|
|
+import com.bz.zoneiot.core.common.exception.BusinessException;
|
|
|
+import com.zcxk.rmcp.pay.commom.BaseSync;
|
|
|
+import com.zcxk.rmcp.pay.commom.constant.MqConstant;
|
|
|
+import com.zcxk.rmcp.pay.dao.CommunityMapper;
|
|
|
+import com.zcxk.rmcp.pay.dao.UserMapper;
|
|
|
+import com.zcxk.rmcp.pay.entity.Community;
|
|
|
+import com.zcxk.rmcp.pay.entity.User;
|
|
|
+import com.zcxk.rmcp.pay.enums.SqlType;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
+import org.springframework.amqp.core.Message;
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitHandler;
|
|
|
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
+
|
|
|
+import javax.annotation.Resource;
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
+import java.util.List;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author Andy
|
|
|
+ * @version V1.0
|
|
|
+ * @description: 同步小区
|
|
|
+ * @date 2021/7/28
|
|
|
+ **/
|
|
|
+@Slf4j
|
|
|
+@Component
|
|
|
+public class SyncUserConsumer extends BaseSync<User> {
|
|
|
+
|
|
|
+ @Resource
|
|
|
+ private UserMapper userMapper;
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 确定同步来的表名字
|
|
|
+ */
|
|
|
+ private static final String TABLE_NAME = "uims_user";
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @author Andy
|
|
|
+ * @description mq入口
|
|
|
+ * @date 11:33 2021/7/29
|
|
|
+ * @param message
|
|
|
+ * @return void
|
|
|
+ **/
|
|
|
+ @RabbitListener(queues = MqConstant.SYNC_COMMUNITY_QUEUE)
|
|
|
+ @RabbitHandler
|
|
|
+ public void exec(Message message) {
|
|
|
+ log.info("======================同步用户数据 begin======================");
|
|
|
+ String msg = new String(message.getBody(), StandardCharsets.UTF_8);
|
|
|
+ if (StringUtils.isEmpty(msg)) {
|
|
|
+ log.info("同步用户数据 消费为空");
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ try {
|
|
|
+ exec(msg);
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("同步用户数据异常:{}", msg);
|
|
|
+ }
|
|
|
+ log.info("======================同步用户数据 end=======================");
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @author Andy
|
|
|
+ * @description 执行入口
|
|
|
+ * @date 11:33 2021/7/29
|
|
|
+ * @param msg
|
|
|
+ * @return void
|
|
|
+ **/
|
|
|
+ private void exec(String msg) throws Exception {
|
|
|
+ try {
|
|
|
+ Map<String, List<User>> mmp = super.stringConvertMap(msg, TABLE_NAME, User.class);
|
|
|
+ mmp.forEach((key, value) -> {
|
|
|
+ log.info("同步用户数据{}操作,需要执行:{}条", key, value.size());
|
|
|
+ tableOperation(key, value);
|
|
|
+ log.info("同步用户数据{}操作,执行成功:{}条", key, value.size());
|
|
|
+ });
|
|
|
+ } catch (BusinessException e) {
|
|
|
+ log.error(e.getMsg());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @author Andy
|
|
|
+ * @description 执行表操作
|
|
|
+ * @date 10:19 2021/7/29
|
|
|
+ * @param operationType, communitys
|
|
|
+ * @return void
|
|
|
+ **/
|
|
|
+ private void tableOperation (String operationType, List<User> list){
|
|
|
+ switch (SqlType.valueOf(operationType)) {
|
|
|
+ case INSERT:
|
|
|
+ case UPDATE:
|
|
|
+ for (User user : list) {
|
|
|
+ userMapper.replaceInsertSelective(user);
|
|
|
+ log.info("同步用户数据,"+ operationType +"操作,操作数据:{}", user.toString());
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ case DELETE:
|
|
|
+ for (User user : list) {
|
|
|
+ /*userMapper.deleteByUserId();
|
|
|
+ log.info("同步用户数据,DELETE操作,操作数据:Id:{}", community.getId());*/
|
|
|
+ }
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+}
|