Quellcode durchsuchen

kafka回传阀控信息

wangli vor 4 Jahren
Ursprung
Commit
6362ce57dc

+ 26 - 0
smart-city-platform/src/main/java/com/bz/smart_city/commom/util/jsonSerializer/BigDecimalJsonSerializer.java

@@ -0,0 +1,26 @@
+package com.bz.smart_city.commom.util.jsonSerializer;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.text.DecimalFormat;
+
+/**
+ * @description BigDecimal保留三位小数
+ * @auto wangli
+ * @data 2020/12/31 14:17
+ */
+public class BigDecimalJsonSerializer extends JsonSerializer<BigDecimal> {
+
+    private DecimalFormat df = new DecimalFormat("0.000");
+
+    @Override
+    public void serialize(BigDecimal bigDecimal, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
+        if(bigDecimal != null){
+            jsonGenerator.writeString(df.format(bigDecimal));
+        }
+    }
+}

+ 24 - 0
smart-city-platform/src/main/java/com/bz/smart_city/commom/util/jsonSerializer/DoubleJsonSerializer.java

@@ -0,0 +1,24 @@
+package com.bz.smart_city.commom.util.jsonSerializer;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import java.io.IOException;
+import java.text.DecimalFormat;
+
+/**
+ * @description Double保留三位小数
+ * @auto wangli
+ * @data 2020/12/31 14:17
+ */
+public class DoubleJsonSerializer extends JsonSerializer<Double> {
+
+    private DecimalFormat df = new DecimalFormat("0.000");
+    @Override
+    public void serialize(Double aDouble, JsonGenerator jsonGenerator, SerializerProvider serializerProvider) throws IOException {
+        if(aDouble != null) {
+            jsonGenerator.writeString(df.format(aDouble));
+        }
+    }
+}

+ 33 - 0
smart-city-platform/src/main/java/com/bz/smart_city/commom/util/jsonSerializer/ValveStateJsonSerializer.java

@@ -0,0 +1,33 @@
+package com.bz.smart_city.commom.util.jsonSerializer;
+
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.SerializerProvider;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.text.DecimalFormat;
+
+/**
+ * @description
+ * @auto wangli
+ * @data 2021/01/20 11:23
+ */
+public class ValveStateJsonSerializer extends JsonSerializer<String> {
+   // 阀门状态: 0关阀 1开阀  2 无阀,3异常
+    @Override
+    public void serialize(String value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
+        if(value != null && value .equals("0")){
+            gen.writeString("关阀");
+        }else
+        if(value != null && value .equals("1")){
+            gen.writeString("1开阀");
+        }else
+        if(value != null && value .equals("2")){
+            gen.writeString("无阀");
+        }else
+        if(value != null && value .equals("3")){
+            gen.writeString("异常");
+        }
+    }
+}

+ 2 - 2
smart-city-platform/src/main/java/com/bz/smart_city/controller/pay/PayControlRecordController.java

@@ -39,7 +39,7 @@ public class PayControlRecordController {
             @ApiParam(value = "阀控规则", required = false) @RequestParam(required = false) String controlRuleId,
             @ApiParam(value = "操作类型 0关阀 1开阀", required = false) @RequestParam(required = false) String type,
             @ApiParam(value = "操作结果 0执行中 1成功 2失败", required = false) @RequestParam(required = false) String result,
-            @ApiParam(value = "阀门状态  0关阀 1开阀 2异常", required = false) @RequestParam(required = false) String state,
+            @ApiParam(value = "阀门状态: 0关阀 1开阀  2无阀,3异常", required = false) @RequestParam(required = false) String state,
             @ApiParam(value = "开始时间yyyyMMdd", required = false) @RequestParam(required = false) String beginTime,
             @ApiParam(value = "结束时间yyyyMMdd", required = false) @RequestParam(required = false) String endTime
     ){
@@ -91,7 +91,7 @@ public class PayControlRecordController {
             @ApiParam(value = "阀控规则", required = false) @RequestParam(required = false) String controlRuleId,
             @ApiParam(value = "操作类型 0关阀 1开阀", required = false) @RequestParam(required = false) String type,
             @ApiParam(value = "操作结果 0执行中 1成功 2失败", required = false) @RequestParam(required = false) String result,
-            @ApiParam(value = "阀门状态  0关阀 1开阀 2异常", required = false) @RequestParam(required = false) String state,
+            @ApiParam(value = "阀门状态: 0关阀 1开阀  2 无阀,3异常", required = false) @RequestParam(required = false) String state,
             @ApiParam(value = "开始时间yyyyMMdd", required = false) @RequestParam(required = false) String beginTime,
             @ApiParam(value = "结束时间yyyyMMdd", required = false) @RequestParam(required = false) String endTime,
             @ApiParam(value = "页数,非必传,默认第一页", required = false, defaultValue = "1") @RequestParam(required = false, defaultValue = "1") int pageNum,

+ 9 - 0
smart-city-platform/src/main/java/com/bz/smart_city/dao/pay/PayControlRecordMapper.java

@@ -5,6 +5,7 @@ import com.bz.smart_city.entity.pay.PayControlRecord;
 import org.apache.ibatis.annotations.Mapper;
 import org.apache.ibatis.annotations.Param;
 
+import java.math.BigInteger;
 import java.util.List;
 
 /**
@@ -17,5 +18,13 @@ public interface PayControlRecordMapper {
     Integer add(@Param("payControlRecord")PayControlRecord payControlRecord);
 
     //查询
+
     List<PayControlRecordDto> findList(@Param("payControlRecordDto") PayControlRecordDto payControlRecordDto);
+
+    PayControlRecord findControlRecordByMetercode(@Param("meterCode") String meterCode);
+
+    Integer updateDeviceValveState(@Param("valveStatus") String valveStatus,@Param("meterCode") String meterCode);
+
+    Integer updateControlRecordResult(@Param("result") Integer result,@Param("valveStatus") Integer valveStatus,@Param("id") BigInteger id);
+
 }

+ 4 - 1
smart-city-platform/src/main/java/com/bz/smart_city/dto/pay/payfee/PayfeeAccountInfoDto.java

@@ -1,6 +1,8 @@
 package com.bz.smart_city.dto.pay.payfee;
 
+import com.bz.smart_city.commom.util.jsonSerializer.ValveStateJsonSerializer;
 import com.fasterxml.jackson.annotation.JsonFormat;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
 import io.swagger.annotations.ApiModel;
 import io.swagger.annotations.ApiModelProperty;
 import lombok.Data;
@@ -55,6 +57,7 @@ public class PayfeeAccountInfoDto {
     @ApiModelProperty(value = "水表电子号" )
     private String metereleno;
     @ApiModelProperty(value = "阀门状态: 0关阀 1开阀  2 无阀,3异常" )
-    private String controlStatus;
+    @JsonSerialize(using = ValveStateJsonSerializer.class)
+    private String valveStatus;
 
 }

+ 1 - 1
smart-city-platform/src/main/java/com/bz/smart_city/entity/pay/PayControlRecord.java

@@ -30,7 +30,7 @@ public class PayControlRecord {
     @ApiModelProperty(example="1",notes = "操作结果 0执行中 1成功 2失败",position = 20)
     private Integer result;
 
-    @ApiModelProperty(example="1",notes = "阀门状态  0关阀 1开阀 2异常",position = 25)
+    @ApiModelProperty(example="1",notes = "阀门状态: 0关阀 1开阀  2 无阀,3异常",position = 25)
     private Integer state;
 
     @ApiModelProperty(example="1",notes = "阀控规则id",position = 30)

+ 64 - 0
smart-city-platform/src/main/java/com/bz/smart_city/kafka/ValveStateConsumer.java

@@ -0,0 +1,64 @@
+package com.bz.smart_city.kafka;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.bz.smart_city.dao.pay.PayControlRecordMapper;
+import com.bz.smart_city.entity.pay.PayControlRecord;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.Resource;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @description
+ * @auto wangli
+ * @data 2021/1/20 8:55
+ */
+@Component
+@Slf4j
+public class ValveStateConsumer {
+    @Resource
+    private PayControlRecordMapper payControlRecordMapper;
+
+    @KafkaListener(topics = {"billing_valveStatus_receiving"})
+    public void listener(ConsumerRecord<String, String> record){
+
+        Optional<String> msg = Optional.ofNullable(record.value());
+        if(msg.isPresent()){
+            log.info("kafka阀门接口数据,{}",msg.get());
+
+            JSONObject jsonObject = JSON.parseObject(msg.get());
+            String valueState = jsonObject.getString("meterStatus"); //阀门状态: 0关阀 1开阀  2 无阀,3异常
+            String meterNo = jsonObject.getString("meterNo");
+
+            //更新设备信息的阀门阀门状态
+            payControlRecordMapper.updateDeviceValveState(valueState,meterNo);
+            //更新阀控记录(最后一条)
+            PayControlRecord payControlRecord = payControlRecordMapper.findControlRecordByMetercode(meterNo);
+
+            Integer state =Integer.valueOf(valueState);
+
+            if(payControlRecord != null && payControlRecord .getResult() == 0){ //执行中的才更新
+                if (payControlRecord.getType() == 0) { //0关阀操作
+                    if(state == 0){   // 阀门为关阀则执行成功
+                        payControlRecordMapper.updateControlRecordResult(1,state,payControlRecord.getId());
+                    }else
+                    if(state == 3){   // 阀门为异常则执行失败
+                        payControlRecordMapper.updateControlRecordResult(2,state,payControlRecord.getId());
+                    }
+                }else if (payControlRecord.getType() == 1) { //1开阀操作
+                    if(state == 1){   // 阀门为开阀则执行成功
+                        payControlRecordMapper.updateControlRecordResult(1,state,payControlRecord.getId());
+                    }else
+                    if(state == 3){   // 阀门为异常则执行失败
+                        payControlRecordMapper.updateControlRecordResult(2,state,payControlRecord.getId());
+                    }
+                }
+            }
+        }
+    }
+}

+ 28 - 1
smart-city-platform/src/main/resources/application-dev.properties

@@ -50,7 +50,34 @@ spring.redis.jedis.pool.max-active=8
 spring.redis.jedis.pool.max-idle=8
 spring.redis.jedis.pool.max-wait=-1ms
 spring.redis.jedis.pool.min-idle=2
- 
+
+
+#kafka
+#============== kafka ===================
+# 指定kafka 代理地址,可以多个
+#spring.kafka.bootstrap-servers=192.168.0.157:9092
+spring.kafka.bootstrap-servers=114.135.61.188:36377
+
+#=============== provider  =======================
+#spring.kafka.producer.retries=0
+## 每次批量发送消息的数量
+#spring.kafka.producer.batch-size=16384
+#spring.kafka.producer.buffer-memory=33554432
+#
+# 指定消息key和消息体的编解码方式
+#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
+#=============== consumer  =======================
+# 指定消费者group id
+spring.kafka.consumer.group-id = water_api_jf_valveStatus_received
+spring.kafka.consumer.auto-offset-reset=latest
+spring.kafka.consumer.enable-auto-commit=true
+#spring.kafka.consumer.auto-commit-interval=100
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+
 
 
 jwt.secret=smartCity123

+ 23 - 8
smart-city-platform/src/main/resources/application-prd.properties

@@ -122,16 +122,31 @@ websocket.so.keepalive=true
 websocket.so.backlog=100
 system.water.meter.lora.code=LORAWM
 
-#kafka配置
-spring.kafka.bootstrap-servers=172.18.110.178:9092
-#kafka消费者配置
-spring.kafka.consumer.group-id=api-group
+#kafka
+#============== kafka ===================
+# 指定kafka 代理地址,可以多个
+#spring.kafka.bootstrap-servers=192.168.0.157:9092
+spring.kafka.bootstrap-servers=114.135.61.188:36377
+
+#=============== provider  =======================
+#spring.kafka.producer.retries=0
+## 每次批量发送消息的数量
+#spring.kafka.producer.batch-size=16384
+#spring.kafka.producer.buffer-memory=33554432
+#
+# 指定消息key和消息体的编解码方式
+#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
+#=============== consumer  =======================
+# 指定消费者group id
+spring.kafka.consumer.group-id = water_api_jf_valveStatus_received
 spring.kafka.consumer.auto-offset-reset=latest
 spring.kafka.consumer.enable-auto-commit=true
-#kafka生产者配置
-spring.kafka.producer.retries=0
-spring.kafka.producer.batch-size=4096
-spring.kafka.producer.buffer-memory=40960
+#spring.kafka.consumer.auto-commit-interval=100
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
 #\u9AD8\u5FB7\u5730\u56FE
 geomap.apikey=bb218fd3700b37dd1e02872365cab4d5

+ 25 - 1
smart-city-platform/src/main/resources/application-sit.properties

@@ -52,7 +52,31 @@ spring.redis.jedis.pool.max-idle=8
 spring.redis.jedis.pool.max-wait=-1ms
 spring.redis.jedis.pool.min-idle=2
 
-
+#kafka
+#============== kafka ===================
+# 指定kafka 代理地址,可以多个
+#spring.kafka.bootstrap-servers=192.168.0.157:9092
+spring.kafka.bootstrap-servers=114.135.61.188:36377
+
+#=============== provider  =======================
+#spring.kafka.producer.retries=0
+## 每次批量发送消息的数量
+#spring.kafka.producer.batch-size=16384
+#spring.kafka.producer.buffer-memory=33554432
+#
+# 指定消息key和消息体的编解码方式
+#spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+#spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
+#=============== consumer  =======================
+# 指定消费者group id
+spring.kafka.consumer.group-id = water_api_jf_valveStatus_received
+spring.kafka.consumer.auto-offset-reset=latest
+spring.kafka.consumer.enable-auto-commit=true
+#spring.kafka.consumer.auto-commit-interval=100
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 
 jwt.secret=smartCity123
 jwt.expiration=36000

+ 21 - 0
smart-city-platform/src/main/resources/mapper/pay/PayControlRecordMapper.xml

@@ -76,4 +76,25 @@
         )
 
     </insert>
+
+    <select id="findControlRecordByMetercode" resultType="com.bz.smart_city.entity.pay.PayControlRecord">
+        select
+            pcr.id,
+            pcr.type,
+            pcr.result,
+            pcr.state
+        from pay_control_record pcr
+        left join sc_device d on pcr.meter_id =d.id
+        where d.metercode = #{meterCode}
+        order by pcr.create_date desc
+        limit 1
+    </select>
+    <update id="updateDeviceValveState">
+        udpate sc_device  set control_status = #{valveStatus}  where metercode = #{meterCode}
+    </update>
+    <update id="updateControlRecordResult">
+        update pay_control_record set result =#{result} ,state =#{valveStatus} where id = #{id}
+    </update>
+
+
 </mapper>

+ 1 - 1
smart-city-platform/src/main/resources/mapper/pay/payFeeMapper.xml

@@ -114,7 +114,7 @@
             p.id as "waterPropertyId",
             p.name as "waterPropertyName",
             device.water_meter_no as "metereleno",
-            device.control_status as "controlStatus"
+            device.control_status as "valveStatus"
         from pay_base_customerandmeterrela c
         left join pay_base_account a on c.account_id=a.id
         left join pay_base_waterproperty p on c.waterProperty_id=p.id