我们经常使用消息队列进行系统之间的解耦,日志记录等等。但是有时候我们在使用 RabbitMQ时,由于exchange、bindKey、routingKey没有设置正确,导致我们发送给交换器(exchange)的消息,由于没有正确的RoutingKey可能会存在一个消息丢失的情况,如果我们希望知道那些消息经过exchange之后,没有被正确的存入消息队列,那么应该如何进行处理。
方案一:使用 mandatory 参数配合 ReturnListener 来进行解决
方案二:使用备份交换器 (alternate exchange) 来进行解决
方案一介绍:
mandatory参数的含义:
true:表示当交换器无法根据自身的类型和路由键找到一个符合条件的队列时,那么RabbitMQ会调用 Basic.Return 命令将消息返回给生产者。生产者使用ReturnListener 来监听没有被正确路由到消息队列中的消息。
false:表示当交换器无法根据自身的类型和路由键找到一个服务条件的队列时,那么RabbitMQ会丢弃这个消息。
注意事项:
1、有时候发现即使 mandatory参数设置成 true,也没有进入 ReturnListener,那么这个可能是什么原因呢?其实这个可能是受RabbitMQ配置的内存和磁盘告警限制。(http://www.rabbitmq.com/alarms.html)
2、这是一个RabbitMQ配置的磁盘告警导致没有进入ReturnListener的例子。(http://rabbitmq.1065348.n5.nabble.com/ReturnListener-is-not-invoked-td24549.html)
示例代码:
/** * RabbitMQ 生产者 * <pre> * 1、ReturnListener 的使用。 * >> mandatory: 参数需要设置成 true , ReturnListener 才会生效。 * >> 用于获取到没有路由到消息队列中的消息。 * 2、ReturnListener 的注意事项 http://www.rabbitmq.com/alarms.html * >> 受到内存和磁盘的限制 * >> http://rabbitmq.1065348.n5.nabble.com/ReturnListener-is-not-invoked-td24549.html(一个RabbitMQ disk_free_limit 参数导致ReturnListener没有进入的例子) * * </pre> * * @author huan.fu * @date 2018/8/21 - 15:23 */ public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String ROUTING_KEY = "missing_routing_key"; private static final String BINDING_KEY = "bingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "140.143.237.224"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(IP_ADDRESS); connectionFactory.setPort(PORT); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); try ( // 创建一个连接 Connection connection = connectionFactory.newConnection(); // 创建信道 Channel channel = connection.createChannel() ) { // 创建一个 type="direct"持久化、非自动删除的交换器 channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null); // 创建一个 持久化、非排他的、非自动删除的交换器 channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 将交换器与队列通过路由键绑定 使用 bindingKey channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, BINDING_KEY); // 发送一条持久化消息 String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 没有被正确路由到消息队列的消息.mandatory参数设置成true"; try { // 使用 routingKey channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.err.println("消息发送完成......"); } catch (IOException e) { e.printStackTrace(); } /** * 处理生产者没有正确路由到消息队列的消息 * 这个可能不会生效:受到 rabbitmq 配置的内存和磁盘的限制 {@link http://www.rabbitmq.com/alarms.html} */ channel.addReturnListener((replyCode, replyText, exchange, routingKey, properties, body) -> { System.out.println("replyCode:" + replyCode); System.out.println("replyText:" + replyText); System.out.println("exchange:" + exchange); System.out.println("routingKey:" + routingKey); System.out.println("properties:" + properties); System.out.println("body:" + new String(body, StandardCharsets.UTF_8)); }); } } }
方案二介绍:
使用方案一,我们需要自己写ReturnListener,这样业务代码就变的复杂了,那么有没有一种简单的方法呢?那就是使用 备份交换器(Alternate Exchange)
声明交换器可以在channel.exchangeDeclare的时候 添加 alternate-exchange 参数来实现,交换器的类型建议声明成 fanout 类型,因为消息被重新发送到备份交换器时的路由键和从生产者出发的路由键是一致的。
示例代码:
public class RabbitProducer { private static final String EXCHANGE_NAME = "exchange_demo"; private static final String BINDING_KEY = "bingkey_demo"; private static final String QUEUE_NAME = "queue_demo"; private static final String IP_ADDRESS = "140.143.237.224"; private static final int PORT = 5672; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost(IP_ADDRESS); connectionFactory.setPort(PORT); connectionFactory.setUsername("root"); connectionFactory.setPassword("root"); try ( // 创建一个连接 Connection connection = connectionFactory.newConnection(); // 创建信道 Channel channel = connection.createChannel() ) { Map<String, Object> arguments = new HashMap<>(16); arguments.put("alternate-exchange", "backup-exchange"); channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, arguments); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, BINDING_KEY); // 声明一个 fanout 类型的交换器,建议此处使用 fanout 类型的交换器 channel.exchangeDeclare("backup-exchange", "fanout", true, false, null); // 消息没有被路由的之后存入的队列 channel.queueDeclare("unRoutingQueue", true, false, false, null); channel.queueBind("unRoutingQueue", "backup-exchange", ""); // 发送一条持久化消息 String message = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()) + " 没有被正确的路由到消息队列,此时此消息会进入 unRoutingQueue"; try { // 使用 routingKey channel.basicPublish(EXCHANGE_NAME, "not-exists-routing-key", true, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes(StandardCharsets.UTF_8)); System.err.println("消息发送完成......"); } catch (IOException e) { e.printStackTrace(); } } } }
上例图解:
相关推荐
jmeter测试Rabbitmq的方法 AMQP Publisher/Consumer ...Expire(超期时间):该值必须为正数(与消息 TTL 不同,该值不可以为 0),所以如果该参数设置为 1000 ,则表示该 queue 如果在 1 秒钟之内未被使用则会被删除。
Go语言版本rabbitmq消息队列库:simple、worker、Fanout 模型、Direct 模型、Topic模型。 RabbitMQ 是使用Erlang编写的一个...RabbitMQ与邮局的主要区别是它不处理纸张,而是接受,存储和转发数据消息的二进制数据块。
同时,它支持消息确认机制,确保消息被正确接收和处理。 灵活的路由:RabbitMQ支持多种交换机类型和路由策略,可以根据消息的内容、标签等属性将消息路由到不同的队列或消费者。 多语言支持:RabbitMQ提供了丰富的...
另外,Kafka 的定位主要在日志等方面, 因为Kafka 设计的初衷就是处理日志的,可以看做是一个日志(消息)系统一个重要组件,针对性很强,所以 如果业务方面还是建议选择 RabbitMq 。 还有就是,Kafka 的性能(吞吐...
消费者:从消息队列中接收和处理消息的应用程序。 绑定:队列与交换器之间的逻辑关系,定义了如何将消息路由到队列。 交换器:将消息路由到队列的规则引擎。 路由键:与消息一起发送的字符串,用于交换器将消息路由...
3消息被拒绝 发布确认模式:消息成功发送到交互机 生产者发送消息,如果路由错误不能到达指定队列 解决方法有如下几种: 1使用备份交换器路由到备胎队列消费。这样可以保证未被路由的消息不会丢失。 2通过消息的回调...
在业务逻辑的异步处理,系统解耦,分布式通信以及控制高并发的场景下,消息队列有着广泛的应用。本项目基于Spring的AMQP模块,整合流行的开源消息队列中间件rabbitMQ,实现一个向rabbitMQ添加和读取消息的功能。并...
此外,面试官还可能会询问关于RabbitMQ的高级特性,例如如何实现消息的持久化、如何实现消息的路由、如何实现消息的过期和死信等。对于这些问题,开发者需要深入了解RabbitMQ的内部机制和API,以及如何使用这些特性...
routing 路由选择 通配符模式 topics 主题 手动和自动确认消息 队列的持久化和非持久化 RabbitMq的延迟队列 在官网教程中,描述了六类工作队列模式: 简单队列模式:最简单的工作队列,其中一个消息生产者,一个...
本文来自于民工哥技术之路,本章介绍了rabbitmq的基本原理、基本运维操作、常见故障处理以及RabbitMQ来部署分布式集群系统的三种方法。简介AMQP,即AdvancedMessageQueuingProtocol,高级消息队列协议,是应用层协议...
AMQP作为比JMS更加高级的消息协议,支持更多的消息路由和消息模式。 包含的特性如下: 如上图,生产者消费者模型:添加了一个队列,并创建了两个消费者用于监听队列消息,我们发现,当有消息到达时,两个消费者会...
Ruby 中的轻量级后台处理,由 RabbitMQ 和优秀的并发 ruby gem 提供支持。安装将此行添加到您的应用程序的Gemfile : gem 'proletariat'并运行: $ bundle如何使用RabbitMQ 连接配置如果您没有使用默认的 ...
通过实际示例,我们将展示如何创建和管理消息队列、发布和订阅消息,以及处理消息确认和异常。此外,文章还将讨论RabbitMQ的一些高级特性,如路由、主题交换和持久化设置,以及如何结合Spring Boot进行应用。最后,...
目前的exchange的路由策略是:每个需要队列的服务独享一个队列(queue),消费者(consumer)采用ACK自动应答模式处理队列消息。 如果需要新增一个队列服务,需要做如下开发步骤: 1.创建队列,发送消息 <?php $...
我们假设在您的处理管道的开始处(或接近开始处),存在一个将消息路由到现有架构实例的交换(例如,根据stable , next , latest消息路由键)。 当切换到具有零停机时间的较新的处理管道时,这对于(beta)测试和...
Mongo作为后端数据存储区(用于保存消息和已处理的数据) 配置驱动的队列系统 主要交易所名称 制片人的演员系统中的演员数量 将使用者排队。 您可以根据需要添加任意数量的队列使用者。 对于每个使用者,可配置...
您依赖于,并且您不关心消息将被处理的顺序 队列/ Exchange绑定是静态的。 即,没有应用动态路由规则,因此我们可以避免问题 您的应用程序正在 Kubernetes 中运行 我们需要的: 当节点不可用时,重新连接并使用回退...
Web应用程序可以将消息路由到与SMS传递网关一起使用的Java应用程序。 MMO游戏可以使用灵活的路由RabbitMQ提供的功能将事件通知传播到玩家和位置。 可以从交易系统到特定地理区域的销售点在感兴趣的各方之间分配...
RabbitMQ生产者消息确认:publisher-confirms(发送到交换机确认),publisher-returns(路由到队列确认) 定时任务:@EnableScheduling,@Scheduled,cron表达式 RabbitMQ消费者消息应答:@RabbitListener,listener.simple....