|
@@ -0,0 +1,197 @@
|
|
|
+package com.huaxu.config;
|
|
|
+
|
|
|
+import org.slf4j.Logger;
|
|
|
+import org.slf4j.LoggerFactory;
|
|
|
+import org.springframework.amqp.core.*;
|
|
|
+import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
|
|
+import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
|
|
|
+import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitAdmin;
|
|
|
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
|
|
+import org.springframework.beans.factory.annotation.Value;
|
|
|
+import org.springframework.beans.factory.config.ConfigurableBeanFactory;
|
|
|
+import org.springframework.context.annotation.Bean;
|
|
|
+import org.springframework.context.annotation.Configuration;
|
|
|
+import org.springframework.context.annotation.Primary;
|
|
|
+import org.springframework.context.annotation.Scope;
|
|
|
+
|
|
|
+import java.util.HashMap;
|
|
|
+import java.util.Map;
|
|
|
+
|
|
|
+/**
|
|
|
+ Broker:它提供一种传输服务,它的角色就是维护一条从生产者到消费者的路线,保证数据能按照指定的方式进行传输,
|
|
|
+ Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列。
|
|
|
+ Queue:消息的载体,每个消息都会被投到一个或多个队列。
|
|
|
+ Binding:绑定,它的作用就是把exchange和queue按照路由规则绑定起来.
|
|
|
+ Routing Key:路由关键字,exchange根据这个关键字进行消息投递。
|
|
|
+ vhost:虚拟主机,一个broker里可以有多个vhost,用作不同用户的权限分离。
|
|
|
+ Producer:消息生产者,就是投递消息的程序.
|
|
|
+ Consumer:消息消费者,就是接受消息的程序.
|
|
|
+ Channel:消息通道,在客户端的每个连接里,可建立多个channel.
|
|
|
+ */
|
|
|
+@Configuration
|
|
|
+public class RabbitConfig {
|
|
|
+
|
|
|
+ private final Logger logger = LoggerFactory.getLogger(this.getClass());
|
|
|
+
|
|
|
+ @Value("${message.spring.rabbitmq.host}")
|
|
|
+ private String messageHost;
|
|
|
+ @Value("${message.spring.rabbitmq.port}")
|
|
|
+ private int messagePort;
|
|
|
+ @Value("${message.spring.rabbitmq.username}")
|
|
|
+ private String messagePsername;
|
|
|
+ @Value("${message.spring.rabbitmq.password}")
|
|
|
+ private String messagePassword;
|
|
|
+ @Value("${message.spring.rabbitmq.virtual-host}")
|
|
|
+ private String messageVirtualHost;
|
|
|
+
|
|
|
+ @Value("${message.spring.rabbitmq.requested-heartbeat}")
|
|
|
+ private int messageRequestedHeartbeat;
|
|
|
+ @Value("${message.spring.rabbitmq.connection-timeout}")
|
|
|
+ private int messageConnectionTimeout;
|
|
|
+// @Value("${message.spring.rabbitmq.publisher-confirm-type}")
|
|
|
+// private CachingConnectionFactory.ConfirmType messagePublisherConfirms;
|
|
|
+
|
|
|
+ @Value("${message.spring.rabbitmq.publisher-confirms}")
|
|
|
+ private Boolean messagePublisherConfirms;
|
|
|
+ @Value("${message.spring.rabbitmq.publisher-returns}")
|
|
|
+ private Boolean messagePublisherReturns;
|
|
|
+
|
|
|
+ @Value("${message.spring.rabbitmq.listener.simple.acknowledge-mode}")
|
|
|
+ private AcknowledgeMode messageAcknowledgeMode;
|
|
|
+ @Value("${message.spring.rabbitmq.listener.simple.prefetch}")
|
|
|
+ private int messagePrefetch;
|
|
|
+
|
|
|
+ @Value("${receive.exchange.name}")
|
|
|
+ private String receiveExchangeName;
|
|
|
+ @Value("${dispath.routing.key}")
|
|
|
+ private String dispathRoutingKey;
|
|
|
+ @Value("${dispath.queue}")
|
|
|
+ private String dispathQueue;
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ @Primary
|
|
|
+ public ConnectionFactory MessageconnectionFactory() {
|
|
|
+ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(messageHost,messagePort);
|
|
|
+ connectionFactory.setUsername(messagePsername);
|
|
|
+ connectionFactory.setPassword(messagePassword);
|
|
|
+ connectionFactory.setVirtualHost(messageVirtualHost);
|
|
|
+ connectionFactory.setRequestedHeartBeat(messageRequestedHeartbeat);
|
|
|
+ connectionFactory.setConnectionTimeout(messageConnectionTimeout);
|
|
|
+// connectionFactory.
|
|
|
+// connectionFactory.setPublisherConfirmType(messagePublisherConfirms);
|
|
|
+ connectionFactory.setPublisherConfirms(messagePublisherConfirms);
|
|
|
+ connectionFactory.setPublisherReturns(messagePublisherReturns);
|
|
|
+ return connectionFactory;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean(name = "messageRabbitTemplate")
|
|
|
+ @Primary
|
|
|
+ public RabbitTemplate MessageRabbitTemplate() {
|
|
|
+ RabbitTemplate template = new RabbitTemplate(MessageconnectionFactory());
|
|
|
+ return template;
|
|
|
+ }
|
|
|
+
|
|
|
+ //配置消费者监听的容器
|
|
|
+ @Bean(name = "messageContainerFactory")
|
|
|
+ @Primary
|
|
|
+ public SimpleRabbitListenerContainerFactory MessageRabbitListenerContainerFactory() {
|
|
|
+ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
|
|
+ factory.setConnectionFactory(MessageconnectionFactory());
|
|
|
+// factory.setConcurrentConsumers(3);
|
|
|
+// factory.setMaxConcurrentConsumers(10);
|
|
|
+ factory.setPrefetchCount(messagePrefetch);
|
|
|
+ factory.setAcknowledgeMode(messageAcknowledgeMode);//设置确认模式
|
|
|
+ return factory;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean(name = "messageRabbitAdmin")
|
|
|
+ @Primary
|
|
|
+ public RabbitAdmin RabbitAdmin() {
|
|
|
+ RabbitAdmin rabbitAdmin = new RabbitAdmin(MessageconnectionFactory());
|
|
|
+ rabbitAdmin.setAutoStartup(true);
|
|
|
+// Map<String, Object> args = new HashMap<>();
|
|
|
+// args.put("x-queue-type", "classic");
|
|
|
+ rabbitAdmin.declareQueue(QueueBuilder.nonDurable(dispathQueue).build());
|
|
|
+ rabbitAdmin.declareExchange(new TopicExchange(receiveExchangeName, false, false, null));
|
|
|
+ rabbitAdmin.declareBinding(
|
|
|
+ BindingBuilder
|
|
|
+ .bind(QueueBuilder.nonDurable(dispathQueue).build()) //队列
|
|
|
+ .to(new TopicExchange(receiveExchangeName, false, false, null)) //直接创建交换机 建立关联关系
|
|
|
+ .with(dispathRoutingKey)); //指定路由Key
|
|
|
+ return rabbitAdmin;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ @Value("${receiveData.spring.rabbitmq.host}")
|
|
|
+ private String dataHost;
|
|
|
+ @Value("${receiveData.spring.rabbitmq.port}")
|
|
|
+ private int dataPort;
|
|
|
+ @Value("${receiveData.spring.rabbitmq.username}")
|
|
|
+ private String dataUsername;
|
|
|
+ @Value("${receiveData.spring.rabbitmq.password}")
|
|
|
+ private String dataPassword;
|
|
|
+ @Value("${receiveData.spring.rabbitmq.virtual-host}")
|
|
|
+ private String dataVirtualHost;
|
|
|
+
|
|
|
+ @Value("${receiveData.spring.rabbitmq.requested-heartbeat}")
|
|
|
+ private int dataRequestedHeartbeat;
|
|
|
+ @Value("${receiveData.spring.rabbitmq.connection-timeout}")
|
|
|
+ private int dataConnectionTimeout;
|
|
|
+// @Value("${receiveData.spring.rabbitmq.publisher-confirm-type}")
|
|
|
+// private CachingConnectionFactory.ConfirmType dataPublisherConfirms;
|
|
|
+
|
|
|
+ @Value("${receiveData.spring.rabbitmq.publisher-confirms}")
|
|
|
+ private Boolean dataPublisherConfirms;
|
|
|
+
|
|
|
+ @Value("${receiveData.spring.rabbitmq.publisher-returns}")
|
|
|
+ private Boolean dataPublisherReturns;
|
|
|
+
|
|
|
+ @Value("${receiveData.spring.rabbitmq.listener.simple.acknowledge-mode}")
|
|
|
+ private AcknowledgeMode dataAcknowledgeMode;
|
|
|
+ @Value("${receiveData.spring.rabbitmq.listener.simple.prefetch}")
|
|
|
+ private int dataPrefetch;
|
|
|
+
|
|
|
+ @Bean
|
|
|
+ public ConnectionFactory ReceiveDataconnectionFactory() {
|
|
|
+ CachingConnectionFactory connectionFactory = new CachingConnectionFactory(messageHost,messagePort);
|
|
|
+ connectionFactory.setUsername(messagePsername);
|
|
|
+ connectionFactory.setPassword(messagePassword);
|
|
|
+ connectionFactory.setVirtualHost(dataVirtualHost);
|
|
|
+ connectionFactory.setRequestedHeartBeat(dataRequestedHeartbeat);
|
|
|
+ connectionFactory.setConnectionTimeout(dataConnectionTimeout);
|
|
|
+// connectionFactory.setPublisherConfirmType(dataPublisherConfirms);
|
|
|
+ connectionFactory.setPublisherConfirms(dataPublisherConfirms);
|
|
|
+ connectionFactory.setPublisherReturns(dataPublisherReturns);
|
|
|
+ return connectionFactory;
|
|
|
+ }
|
|
|
+
|
|
|
+ @Bean(name = "receiveDataRabbitTemplate")
|
|
|
+ @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
|
|
|
+ //必须是prototype类型
|
|
|
+ public RabbitTemplate ReceiveDatarabbitTemplate() {
|
|
|
+ RabbitTemplate template = new RabbitTemplate(ReceiveDataconnectionFactory());
|
|
|
+ return template;
|
|
|
+ }
|
|
|
+
|
|
|
+ //配置消费者监听的容器
|
|
|
+ @Bean(name = "receiveDataContainerFactory")
|
|
|
+ public SimpleRabbitListenerContainerFactory ReceiveDatarabbitListenerContainerFactory() {
|
|
|
+ SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
|
|
+ factory.setConnectionFactory(ReceiveDataconnectionFactory());
|
|
|
+// factory.setConcurrentConsumers(3);
|
|
|
+// factory.setMaxConcurrentConsumers(10);
|
|
|
+ factory.setPrefetchCount(dataPrefetch);
|
|
|
+ factory.setAcknowledgeMode(dataAcknowledgeMode);//设置确认模式手工确认
|
|
|
+ return factory;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+}
|