`
阅读更多

    本文简单记录一下 spring 整合 rabbitmq,此处引入spring boot是为了方便引入和rabbitmq整合相关的jar包,并没有使用spring boot整合 rabbitmq。

 

实现功能

  1. 完成 spring 和 rabbitmq 的整合
  2. 完成使用 rabbitAdmin 创建队列等
  3. 完成使用 @Bean 注解声明队列等
  4. 完成使用 RabbitTemplate 进行发送消息
  5. 使用 SimpleMessageListenerContainer 进行消息的监听,可以对消息进行各种适配等

整合步骤:

1、引入 jar 包。

2、配置 ConnectionFacotry。

3、配置 RabbitAdmin 方便维护队列、交换器、绑定等。

4、配置 RabbitTemplate 方便程序中的消息的发送和接收等。

5、配置 SimpleMessageListenerContainer 方便程序中消息的监听。

 

小知识点:

1、在程序中进行队列、交换器、绑定的声明和使用 @Bean 注解来进行声明,如果要想在程序启动的时候自动创建这些队列等,那么在配置 RabbitAdmin 的时候需要将  autoStartup 属性设置成 true。

2、如果我们在程序运行的过程中需要动态修改 监听的队列或移除队列等,那么可以使用  SimpleMessageListenerContainer 来动态修改这些参数。

3、在如果我们想对接收到RabbitMQ发送的消息进行适配和消息转换等,那么使用SimpleMessageListenerContainer这个可以使用。

 

实现步骤:

1、引入 spring 整合 rabbitmq 整合的 maven 依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

2、spring 整合 rabbitmq 的核心配置类

/**
 * rabbitmq 配置
 *
 * @author huan.fu
 * @date 2018/10/17 - 10:57
 */
@Configuration
@Slf4j
public class RabbitmqConfiguration {

	/**
	 * 创建 rabbitmq 连接工厂
	 *
	 * @return
	 */
	@Bean
	public ConnectionFactory connectionFactory() {
		CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
		connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
		connectionFactory.setHost("140.143.237.224");
		connectionFactory.setPort(5672);
		connectionFactory.setUsername("root");
		connectionFactory.setPassword("root");
		connectionFactory.setVirtualHost("/");
		return connectionFactory;
	}

	/**
	 * rabbitmq 实现 AMQP 便携式的管理操作,比如创建队列、绑定、交换器等
	 *
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
		RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
		rabbitAdmin.setAutoStartup(true);
		return rabbitAdmin;
	}

	/**
	 * rabbit mq 模板
	 *
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
		return new RabbitTemplate(connectionFactory);
	}

	/**
	 * 消息监听容器
	 *
	 * @param connectionFactory
	 * @return
	 */
	@Bean
	public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
		SimpleMessageListenerContainer simpleMessageListenerContainer = new SimpleMessageListenerContainer(connectionFactory);
		// 设置监听的队列
		simpleMessageListenerContainer.setQueueNames("queue001", "queue002");
		// 指定要创建的并发使用者的数量,默认值是1,当并发高时可以增加这个的数值,同时下方max的数值也要增加
		simpleMessageListenerContainer.setConcurrentConsumers(3);
		// 最大的并发消费者
		simpleMessageListenerContainer.setMaxConcurrentConsumers(10);
		// 设置是否重回队列
		simpleMessageListenerContainer.setDefaultRequeueRejected(false);
		// 设置签收模式
		simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		// 设置非独占模式
		simpleMessageListenerContainer.setExclusive(false);
		// 设置consumer未被 ack 的消息个数
		simpleMessageListenerContainer.setPrefetchCount(1);
		// 接收到消息的后置处理
		simpleMessageListenerContainer.setAfterReceivePostProcessors((MessagePostProcessor) message -> {
			message.getMessageProperties().getHeaders().put("接收到消息后", "在消息消费之前的一个后置处理");
			return message;
		});
		// 设置 consumer 的 tag
		simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {
			private AtomicInteger consumer = new AtomicInteger(1);

			@Override
			public String createConsumerTag(String queue) {
				return String.format("consumer:%s:%d", queue, consumer.getAndIncrement());
			}
		});
		// 设置消息监听器
		simpleMessageListenerContainer.setMessageListener((ChannelAwareMessageListener) (message, channel) -> {
			try {
				log.info("============> Thread:[{}] 接收到消息:[{}] ", Thread.currentThread().getName(), new String(message.getBody()));
				channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
			} catch (Exception e) {
				log.error(e.getMessage(), e);
				// 发生异常此处需要捕获到
				channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
			}
		});

		/**  ================ 消息转换器的用法 ================
		 simpleMessageListenerContainer.setMessageConverter(new MessageConverter() {
		 // 将 java 对象转换成 Message 对象
		 @Override public Message toMessage(Object object, MessageProperties messageProperties) {
		 return null;
		 }

		 // 将 message 对象转换成 java 对象
		 @Override public Object fromMessage(Message message) {
		 return null;
		 }
		 });
		 */

		/**  ================ 消息适配器的用法,用于处理各种不同的消息 ================
		 MessageListenerAdapter adapter = new MessageListenerAdapter();
		 // 设置真正处理消息的对象,可以是一个普通的java对象,也可以是 ChannelAwareMessageListener 等
		 adapter.setDelegate(null);
		 adapter.setDefaultListenerMethod("设置上一步中delegate对象中处理的方法名");

		 ContentTypeDelegatingMessageConverter converters = new ContentTypeDelegatingMessageConverter();
		 // 文本装换器
		 MessageConverter txtMessageConvert = null;
		 // json 转换器
		 MessageConverter jsonMessageConvert = null;

		 converters.addDelegate("text", txtMessageConvert);
		 converters.addDelegate("html/text", txtMessageConvert);
		 converters.addDelegate("text/plain", txtMessageConvert);

		 converters.addDelegate("json", jsonMessageConvert);
		 converters.addDelegate("json/*", jsonMessageConvert);
		 converters.addDelegate("application/json", jsonMessageConvert);

		 adapter.setMessageConverter(converters);
		 simpleMessageListenerContainer.setMessageListener(adapter);

		 */
		return simpleMessageListenerContainer;
	}


	@Bean
	public Queue queue003() {
		return new Queue("queue003", false, false, false, null);
	}

	@Bean
	public Exchange exchange003() {
		return new TopicExchange("exchange003", false, false, null);
	}

	@Bean
	public Binding binding003() {
		return new Binding("queue003", Binding.DestinationType.QUEUE, "exchange003", "save.*", null);
	}

}

 3、动态移除和增加对队列的监听

/**
 * 测试动态改变 SimpleMessageListenerContainer 的属性,比如动态增加需要监听的队列等
 *
 * @author huan.fu
 * @date 2018/10/17 - 15:12
 */
@Component
@Slf4j
public class DynamicSimpleMessageListenerContainerTest implements InitializingBean {

	@Autowired
	private ApplicationContext applicationContext;

	@Override
	public void afterPropertiesSet() throws Exception {
		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(10);
				SimpleMessageListenerContainer simpleMessageListenerContainer = applicationContext.getBean(SimpleMessageListenerContainer.class);
				log.info("移除对队列:[{}]的监听", "queue001");
				simpleMessageListenerContainer.removeQueueNames("queue001");
				TimeUnit.SECONDS.sleep(5);
				log.info("添加对队列:[{}]的监听", "queue001");
				String[] queueNames = simpleMessageListenerContainer.getQueueNames();
				Arrays.copyOf(queueNames, queueNames.length + 1);
				queueNames[queueNames.length - 1] = "queue001";
				simpleMessageListenerContainer.addQueueNames(queueNames);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}).start();
	}
}

 4、使用 RabbitAdmin 创建队列等

/**
 * 测试 rabbitAdmin
 *
 * @author huan.fu
 * @date 2018/10/17 - 12:54
 */
@Component
@Slf4j
public class RabbitAdminService implements InitializingBean {

	@Autowired
	private RabbitAdmin rabbitAdmin;

	/**
	 * 创建队列
	 *
	 * @param queueName
	 */
	public void createQueue(String queueName) {
		log.info("创建队列:[{}]", queueName);
		rabbitAdmin.declareQueue(new Queue(queueName, false, false, false, null));
	}

	/**
	 * 创建direct交换器
	 *
	 * @param exchangeName
	 */
	public void createDirectExchange(String exchangeName) {
		log.info("创建direct交换器:[{}]", exchangeName);
		rabbitAdmin.declareExchange(new DirectExchange(exchangeName, false, false, null));
	}

	/**
	 * 创建topic交换器
	 *
	 * @param exchangeName
	 */
	public void createTopicExchange(String exchangeName) {
		log.info("创建topic交换器:[{}]", exchangeName);
		rabbitAdmin.declareExchange(new TopicExchange(exchangeName, false, false, null));
	}

	@Override
	public void afterPropertiesSet() throws Exception {
		createQueue("queue001");
		createQueue("queue002");
		createDirectExchange("exchange001");
		createTopicExchange("exchange002");
		// 创建绑定
		rabbitAdmin.declareBinding(new Binding("queue001", Binding.DestinationType.QUEUE, "exchange001", "direct_001", null));
		// 创建绑定
		rabbitAdmin.declareBinding(new Binding("queue002", Binding.DestinationType.QUEUE, "exchange002", "topic.save.#", null));
	}
}

 5、使用 RabbitTemplate 进行发送消息

/**
 * RabbitTemplate测试
 *
 * @author huan.fu
 * @date 2018/10/17 - 14:35
 */
@Component
@Slf4j
public class RabbitTemplateTest implements InitializingBean {

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@Override
	public void afterPropertiesSet() throws Exception {
		new Thread(() -> {
			try {
				TimeUnit.SECONDS.sleep(5);
				IntStream.rangeClosed(1, 10).forEach(num -> rabbitTemplate.convertAndSend("exchange001", "direct_001", String.format("这个是第[%d]条消息.", num)));
			} catch (InterruptedException e) {
				log.error(e.getMessage(), e);
			}
		}).start();
	}
}

 6、启动类

/**
 * spring 整合 rabbitmq
 *
 * @author huan.fu
 * @date 2018/10/17 - 10:53
 */
@SpringBootApplication
public class RabbitMqApplication {
	public static void main(String[] args) {
		SpringApplication.run(RabbitMqApplication.class, args);
	}
}

 7、运行结果
 

程序代码

代码: https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-spring

  • 大小: 172.6 KB
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics