|
@@ -1,10 +1,12 @@
|
|
|
package com.zcxk.zoniot.data.processor.listener;
|
|
|
|
|
|
+import com.zcxk.zoniot.data.processor.service.AsyncProcessDeviceDataService;
|
|
|
import com.zcxk.zoniot.data.processor.service.DeviceDataProcessor;
|
|
|
import com.zcxk.zoniot.smartcity.common.dto.DeviceDataDTO;
|
|
|
import com.zcxk.zoniot.smartcity.common.model.DeviceData;
|
|
|
import com.zcxk.zoniot.smartcity.common.utils.DtoEntityUtil;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.apache.commons.lang3.time.StopWatch;
|
|
|
import org.bson.Document;
|
|
|
import org.bson.types.ObjectId;
|
|
|
import org.springframework.beans.factory.InitializingBean;
|
|
@@ -14,6 +16,7 @@ import org.springframework.data.mongodb.core.query.BasicQuery;
|
|
|
import org.springframework.data.mongodb.core.query.Criteria;
|
|
|
import org.springframework.data.mongodb.core.query.Query;
|
|
|
import org.springframework.kafka.annotation.KafkaListener;
|
|
|
+import org.springframework.scheduling.annotation.Async;
|
|
|
import org.springframework.stereotype.Component;
|
|
|
|
|
|
import java.util.Collections;
|
|
@@ -34,15 +37,23 @@ public class DeviceDataMessageBus implements InitializingBean {
|
|
|
|
|
|
@Autowired
|
|
|
MongoTemplate mongoTemplate ;
|
|
|
+
|
|
|
+ @Autowired
|
|
|
+ AsyncProcessDeviceDataService asyncProcessDeviceDataService;
|
|
|
+
|
|
|
@KafkaListener(topics = {"${device.data.topic}"})
|
|
|
public void receiveDeviceDataMessage(String message) {
|
|
|
+ log.info("Receive DeviceDataMessage Method , Message = {}",message);
|
|
|
String[] splits = message.split("#");
|
|
|
Query query = new Query();
|
|
|
query.addCriteria(Criteria.where("_id").is(new ObjectId(splits[0])));
|
|
|
query.addCriteria(Criteria.where("deviceId").is(Long.parseLong(splits[1])));
|
|
|
DeviceData data = mongoTemplate.findOne(query, DeviceData.class);
|
|
|
- for(DeviceDataProcessor processor : deviceDataProcessors){
|
|
|
- processor.process(data);
|
|
|
+ asyncProcessDeviceDataService.asyncProcessDeviceData(data,deviceDataProcessors);
|
|
|
+ try {
|
|
|
+ Thread.sleep(100L);
|
|
|
+ } catch (InterruptedException e) {
|
|
|
+ log.error("thread interrupt",e);
|
|
|
}
|
|
|
}
|
|
|
|