这篇文章,我们使用springboot来简单整合一下RabbitMQ,由于sprinboot的 “约定优于配置“,使得我们整合起来非常方便。参考文档
实现功能:
- 自动申明队列、交换器和绑定
- 使用自定义的MessageConvert如何进行配置
- 如何传递 JavaBean 消息和普通消息
- 如何手动签收消息
实现要点:
** 自定申明队列、交换器和绑定
1、通过使用@Queue等注解实现
2、使用@RabbitListener来实现
** 使用自定义的MessageConvert
1、这个只需要 MessageConvert 申明的Bean 存在 @Bean注解且可以被Spring扫描到即可
** 传递 JavaBean 消息
1、传递的 JavaBean 消息需要实现 java.io.Serializable 接口(默认情况下)
** 消费端如何监听消息
1、使用@RabbitListener和@RabbitHandler结合使用
springboot整合rabbitmq自动配置位置:
整合步骤:
1、引入amqp依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>1.5.17.RELEASE</version> </dependency>
2、rabbitmq 的springboot 配置
server: port: 9087 spring: rabbitmq: host: 140.143.237.224 port: 5672 username: root password: root virtual-host: / connection-timeout: 10000 listener: simple: acknowledge-mode: manual # 手动应答 auto-startup: true default-requeue-rejected: false # 不重回队列 concurrency: 5 max-concurrency: 20 prefetch: 1 # 每次只处理一个信息 retry: enabled: false
3、配置自定义的消息转换器
/** * rabbitmq 配置类 * * @author huan.fu * @date 2018/10/24 - 19:42 */ @Configuration @Slf4j public class RabbitConfiguration { @Bean public MessageConverter messageConverter() { return new SimpleMessageConverter() { @Override protected Message createMessage(Object object, MessageProperties messageProperties) throws MessageConversionException { Message message = super.createMessage(object, messageProperties); log.info("使用自定义的MessageConvert转换消息"); return message; } }; } }
4、编写用户实体类
/** * 用户实体类 * * @author huan.fu * @date 2018/10/22 - 15:35 */ @Data @NoArgsConstructor @AllArgsConstructor @Builder public class User implements Serializable { private Integer userId; private String username; private String password; }
注意:这个实体类需要实现序列化接口
5、创建消息生产者
@Component public class RabbitProducer { @Autowired private AmqpTemplate amqpTemplate; /** * 发送消息 */ @SendTo public void sendMessage() { new Thread(() -> { try { TimeUnit.SECONDS.sleep(3); } catch (InterruptedException e) { e.printStackTrace(); } // 发送简单消息 IntStream.rangeClosed(1, 10).forEach(num -> { String body = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " : " + num; MessageProperties properties = new MessageProperties(); properties.setContentEncoding("UTF-8"); properties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT); Message message = new Message(body.getBytes(Charset.forName(properties.getContentEncoding())), properties); amqpTemplate.convertAndSend("rabbit-springboot-exchange", "rabbit-springboot-routingkey", message); }); // 发送java bean 消息 IntStream.rangeClosed(1, 10).forEach(num -> { User user = User.builder().userId(num).username("zhangsan:" + num).password("666666").build(); amqpTemplate.convertAndSend("rabbit-springboot-exchange", "rabbit-springboot-routingkey.javabean", user); }); }).start(); } }
注意: AmqpTemplate 为springboot 默认配置的
6、创建消息接收者
public class RabbitReceiver { @RabbitHandler @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "rabbit-springboot-queue", durable = "true", exclusive = "false", autoDelete = "false"), exchange = @Exchange(value = "rabbit-springboot-exchange", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"), key = "rabbit-springboot-routingkey" )) public void receiveMessage(Message message, Channel channel) { String encoding = message.getMessageProperties().getContentEncoding(); log.info("接收到消息1:[{}]", new String(message.getBody(), Charsets.toCharset(encoding))); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { log.error(e.getMessage(), e); } } @RabbitHandler @RabbitListener(bindings = @QueueBinding( value = @Queue(value = "rabbit-springboot-queue-javabean", durable = "true", exclusive = "false", autoDelete = "false"), exchange = @Exchange(value = "rabbit-springboot-exchange", type = ExchangeTypes.DIRECT, durable = "true", autoDelete = "false"), key = "rabbit-springboot-routingkey.javabean" )) public void receiveMessage(User user, Message message, Channel channel) { log.info("接收到消息2:[{}]", user); try { channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (IOException e) { log.error(e.getMessage(), e); } } }
注意:此处使用 @RabbitListener 来自动创建队列、交换器和绑定等
7、常见消息启动类
@SpringBootApplication @Slf4j public class Application implements ApplicationRunner { @Autowired private RabbitProducer rabbitProducer; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Override public void run(ApplicationArguments applicationArguments) throws Exception { rabbitProducer.sendMessage(); } }
8、运行结果
完成代码:
springboot整合rabbitmq:https://gitee.com/huan1993/rabbitmq/tree/master/rabbitmq-springboot
相关推荐
SpringBoot整合RabbitMQ的详细过程 **1.该篇博文首先讲述了交换机和队列之间的绑定关系** ①direct、②fanout、③topic **2.然后讲消息的回调** 四种情况下,确认触发哪个回调函数: ①消息推送到server,但是在...
Springboot整合RabbitMQ最简单demo,适用于springcloud项目,作为消息总线适用,需要安装RabbitMQ,Mac linux可以使用命令行一键安装,在项目配置文件配置好端口即可(已默认配置),启动项目访问8080端口,参数见controller.
springBoot整合RabbitMQ案例
Springboot整合RabbitMQ最简单demo
springBoot整合rabbitMQ,包括erlang20.3,rabbitmq-server-3.7.14安装包 整合4种常用模式+高级特性死信队列 暂未整合TTL队列 博客地址:...
reabbitmq的完整学习 >rabbitMq软件上传到liunx服务器 >RabbitMQ入门 ...>Springboot整合RabbitMQ >SpringBoot整合RabbitMQ(交换机与多个队列绑定) >RabbitMQ-集群搭建>负载均衡-HAProxy 完整链接地址: ...
SpringBoot整合Rabbitmq发送接收消息实战 另外,博主发起了SpringBoot整合Rabbitmq这一系列的gitchat交流会。刚兴趣的童鞋可以进入交流:https://gitbook.cn/gitchat/activity/5b90f9214fb1bd5c9acd4338 交流QQ:...
SpringBoot整合RabbitMQ 实现消息发送确认与消息接收确认机制 源码及教材 可以参考博客: https://blog.csdn.net/qq_29914837/article/details/93376741
springboot整合rabbitmq,开启手工确认,保证消息100%投递。springboot整合rabbitmq,开启手工确认,保证消息100%投递。
基于SpringBoot整合RabbitMQ发送邮件通知---构建springcloud微服务资源搭建。
springboot整合rabbitmq项目
Springboot整合RabbitMq学习
SpringBoot 整合 RabbitMQ demo
springboot整合rabbitmq转发mqtt
文件内包含了rabbit安装的必需文件以及springboot整合rabbitmq的完整代码,代码里包含了原生的rabbitmq使用代码和整合springboot后的使用代码,还有rabbit队列的所有消息队列模式,代码简单易懂,解压打开就可以使用
springboot整合rabbitmq使用死信队列
1 SpringBoot整合RabbitMQ实战系列教程-整合配置篇-源码数据库
RabbitMQ入门到进阶(Spring整合RabbitMQ&SpringBoot整合RabbitMQ).doc
springboot使用rabbitmq工具类,里面包含比较原生的方法,还有一套是我结合springboot框架写的一套方法,里面有两个方法,看情况使用,一般使用框架的方法比较好,因为框架方法时前辈们封装好经过检验的没有问题的方法,...
SpringBoot整合RabbitMQ基础学习Exchange源码