当前位置: 首页 > news >正文

平乡县网站建设平台/最新新闻热点话题

平乡县网站建设平台,最新新闻热点话题,南通建设工程信息网官网,wordpress 图片缩略图相信自己,终会成功 目录 Spring连接RabbitMQ 1.引入依赖 2.配置rabbitmq 3.生产者(简单模式) Bean 将方法的返回值交给 Spring 容器管理 作用 4.消费者 5.测试 消息确认机制(用于控制消息的确认行为) 持久性 发送方确认 重试机制(在消息处理失败时&…

相信自己,终会成功

目录

Spring连接RabbitMQ

1.引入依赖

2.配置rabbitmq

3.生产者(简单模式)

@Bean 将方法的返回值交给 Spring 容器管理

作用

4.消费者

5.测试

消息确认机制(用于控制消息的确认行为)

持久性

发送方确认

重试机制(在消息处理失败时,自动重试)

死信队列(DLQ处理无法被正常消费的消息)

设置TTL

事务支持(确保消息的发送和确认在事务中执行)

消息分发  决定了消息如何从生产者传递到消费者


Spring连接RabbitMQ

1.引入依赖

        <dependency><groupId>org.springframework.amqp</groupId><artifactId>spring-rabbit-test</artifactId><scope>test</scope></dependency>

2.配置rabbitmq

在yml中配置

spring:
#  application:
#    name=rabbit-extensions-demorabbitmq:addresses: amqp://用户名:密码@云服务器IP地址:5672/虚拟主机listener:simple:#        acknowledge-mode: none  #消息接收确认#        acknowledge-mode: auto  #消息接收确认acknowledge-mode: manual  #消息接收确认prefetch: 1  #预取数量retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待时长为5秒max-attempts: 5 # 最大重试次数#    publisher-confirm-type: correlated   #消息发送确认

properties 配置

spring.rabbitmq.host=云服务器IP地址
spring.rabbitmq.port=5672
spring.rabbitmq.username=用户名
spring.rabbitmq.password=密码

3.生产者(简单模式)

声明队列和交换机(使用Spring AMQP中的 QueueBuilder 和 ExchangeBuilder)


//声明常量
public class Constants {public static final String ACK_QUEUE = "ack.queue";public static final String ACK_EXCHANGE = "ack.exchange";}
    
@Configuration
public class RabbitMQConfig {//声明队列@Bean("ackQueue")public Queue ackQueue(){return QueueBuilder.durable(Constants.ACK_QUEUE).build();}
//声明交换机@Bean("directExchange")public DirectExchange directExchange(){return ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build();}//    将队列绑定到交换机,并指定路由键(Routing Key)
//    @Bean("ackBinding")
//    public Binding ackBinding(Exchange directExchange, Queue queue){
//        return BindingBuilder.bind(queue).to(directExchange).with("ack").noargs();
//    }//  将队列绑定到交换机,并指定路由键(Routing Key)
//  directExchange 的类型是 DirectExchange,这是一个具体的交换机类型。@Bean("ackBinding")public Binding ackBinding(@Qualifier("directExchange") DirectExchange directExchange, @Qualifier("ackQueue") Queue queue){return BindingBuilder.bind(queue).to(directExchange).with("ack");}
}//   BindingBuilder.bind(queue).to(directExchange).with("ack");//   这里没有调用 noargs(),因为 DirectExchange 的绑定不需要额外的参数。//   由于 directExchange 是 DirectExchange 类型,这段代码明确指定了交换机的类型,适用于 Direct 类型的交换机。
  • BindingBuilder.bind(queue).to(directExchange).with("ack")
  • bind(queue):指定要绑定的队列
  • to(directExchange):指定要绑定的交换机
  • with("ack"):指定路由键(Routing Key)
  • noargs():表示不设置额外的绑定参数(可选)。

@Configuration在类方法上面的注解

作用定义 Bean:在配置类中,可以使用 @Bean 注解来定义 Spring 容器管理的 Bean。替代 XML 配置@Configuration 注解是 Java 配置的方式,可以完全替代传统的 XML 配置文件。模块化配置:可以将不同的配置分散到多个配置类中,便于管理和维护。

@Configuration 与 @Bean 的关系

@Configuration 类中的 @Bean 方法会被 Spring 拦截,确保每次调用返回的是同一个 Bean 实例(单例模式)。如果去掉 @Configuration 注解,@Bean 方法将不会被 Spring 代理,每次调用都会返回一个新的实例。

@Configuration 与 @Component 的区别

@Configuration:用于定义配置类,通常包含 @Bean 注解的方法,用于显式定义 Bean。@Component:用于标记普通的组件类(如 Service、Repository 等),Spring 会自动扫描并注册为 Bean。

指定一个接口,接口的 value 元素可用于在使用注释时传递特定值

 指定调用路径

    @Resource(name="rabbitTemplate")private RabbitTemplate rabbitTemplate;@RequestMapping("/ack")public String ack() {rabbitTemplate.convertAndSend(Constants.ACK_EXCHANGE, "ack", "consumer ack mode test...");return "消息发送成功";}

@Resource 是 Java 的注解,用于依赖注入 . name="rabbitTemplate" 表示注入一个名为 rabbitTemplate 的 Bean。RabbitTemplate 对象,它是 Spring AMQP 提供的一个工具类,用于与 RabbitMQ 进行交互(发送和接收消息)convertAndSend 是 RabbitTemplate 的一个方法,用于将消息发送到指定的交换器

   public void convertAndSend(String exchange, String routingKey, Object object) throws AmqpException {this.convertAndSend(exchange, routingKey, object, (CorrelationData)null);}
//String exchange交换机的名字
//String routingKey 指定的路由键,决定消息是如何从路由器到队列
//Object object 指定发送的内容

QueueBuilder.durable(Constants.ACK_QUEUE).build()

使用 QueueBuilder 创建一个持久化的队列。durable(true) 表示队列是持久化的,即使 RabbitMQ 服务器重启,队列也不会丢失。Constants.ACK_QUEUE 是一个常量,表示队列的名称。

ExchangeBuilder.directExchange(Constants.ACK_EXCHANGE).build()

使用 ExchangeBuilder 创建一个直连交换机(Direct Exchange)。Constants.ACK_EXCHANGE 是一个常量,表示交换机的名称。


@Bean 将方法的返回值交给 Spring 容器管理

作用

  1. 将对象交给 Spring 管理

    • 被 @Bean 注解标记的方法,其返回值会被 Spring 容器注册为一个 Bean。

    • 这个 Bean 可以被其他组件(如 @Autowired)注入和使用。

  2. 控制 Bean 的生命周期

    • Spring 容器会管理 Bean 的创建、初始化、销毁等生命周期。

    • 可以通过 @Bean 的 initMethod 和 destroyMethod 属性指定初始化和销毁方法。

  3. 自定义 Bean 的名称

    • 默认情况下,Bean 的名称是方法名。

    • 可以通过 @Bean("自定义名称") 指定 Bean 的名称。

4.消费者

@Configuration
public class AckListerner {
//@RabbitListener 表示是一个消息监听器,在这个代码中表示监听Constants.ACK_QUEUE@RabbitListener(queues = Constants.ACK_QUEUE)public void handMessage(Message message, Channel channel) throws IOException {
//每次从队列中消费一条消息时,RabbitMQ都会为该消息分配一个唯一的deliveryTag。long deliveryTag = message.getMessageProperties().getDeliveryTag();try {//消费者逻辑System.out.printf("接收到消息: %s, deliveryTag: %d \n", new String(message.getBody(),"UTF-8"), message.getMessageProperties().getDeliveryTag());//进行业务逻辑处理System.out.println("业务逻辑处理");System.out.println("业务处理完成");//            这是对消息的肯定确认(acknowledgment),表示消息已经被成功处理。
//            deliveryTag指定要确认的消息。
//            false表示不进行批量确认,只确认当前消息。channel.basicAck(deliveryTag,false);} catch (Exception e) {//            deliveryTag指定要否定的消息。
//            false表示不进行批量否定确认。
//            true表示将消息重新放回队列,以便稍后重新处理。channel.basicNack(deliveryTag, false, true);}}}

5.测试

在RabbitMQ可视化界面中可以看到新建的交换机和队列

deliverytag正常从1开始,因为先演示的生产者,队列中存在一条数据,重新启动程序后,消费者自动消费了数据,但是因为我没截图,只好重新发送一条

消费成功后的队列


消息确认机制(用于控制消息的确认行为)

Spring提供的AcknowledgeMode

AcknowledgeMode.NONE

这种模式下,消息一旦投递给消费者,不管消费者是否成功处理了消息,RabbitMQ就会自动确认消息,从Rabbit MQ队列中移除消息,如果消费者处理消息失效,消息可能会丢失

AcknowledgeMode.AUTO(默认)

这种模式下,消费者在消息处理成功时会自动确认消息,但如果处理过程中抛出了异常,则不会确认消息

AcknowledgeMode.MANUAL

这种模式下,消费者必须在成功处理消息后显式调用basicAck方法来确认消息,如果消息未被确认,RabbitMQ会认为消息尚未被成功处理,并且会在消费者可用时重新投递该消息,这种模式提供了消息处理的可靠性,因为即使消费者处理消息后失败,消息也不会丢失,而是可以被重新处理


持久性

交换机持久化

队列持久化

消息的持久化

消息是存储在队列中,所以消息的持久化,需要队列持久化+消息持久化

如果只设置了队列持久化,MQ重启后,消息就会丢失

如果只设置了消息的持久化,MQ重启后,队列会丢失,消息也会丢失

//消息非持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);
//消息持久化
//        message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
  • getMessageProperties():获取消息的属性对象(MessageProperties)。

  • MessageDeliveryMode.PERSISTENT

    • 消息会被持久化到磁盘。

    • 即使 RabbitMQ 服务器重启,消息也不会丢失。

    • 适用于需要确保消息不丢失的场景。

  • MessageDeliveryMode.NON_PERSISTENT

    • 消息不会被持久化到磁盘,仅存储在内存中。

    • 如果 RabbitMQ 服务器重启,消息会丢失。

    • 适用于对消息可靠性要求不高的场景,性能更高。

队列/交换机队列/消息(交换机默认持久化)
持久化/持久化都不会消失都不会消失
非持久化/非持久化都会消失都消失
持久化/非持久化都不会消失队列不会消失,消息会消失
非持久化/持久化都消失都消失

发送方确认

在 RabbitMQ 中,发送方确认(Publisher Confirms) 是一种确保消息成功到达 RabbitMQ 服务器的机制。通过启用发送方确认,生产者可以知道消息是否被 RabbitMQ 成功接收。如果消息未被接收,生产者可以进行重试或其他处理。

生产者发送消息到 RabbitMQ 服务器。RabbitMQ 服务器接收消息后,会向生产者发送一个确认(ack)或未确认(nack)的信号。生产者通过回调方法处理确认或未确认的信号。

1.confirm模式

通常在计算机编程、系统交互或业务流程等场景中,confirm 模式指的是一种确认机制。

public String confirm() {CorrelationData correlationData = new CorrelationData("1");confirmRabbitTemplate.convertAndSend(Constants.CONFIRM_EXCHANGE, "confirm111", "confirm test...", correlationData);return "消息发送成功";}

CorrelationData 的作用

CorrelationData("1"):初始化 CorrelationData,并为其设置一个唯一标识符(例如 "1"

  • 消息关联

    • CorrelationData 用于将消息与其确认状态关联起来。

    • 每个消息可以有一个唯一的 CorrelationData 对象。

  • 确认回调

    • 当 RabbitMQ 服务器确认收到消息时,会触发确认回调。

    • 在回调中,可以通过 CorrelationData 识别是哪条消息被确认。

 @Beanpublic RabbitTemplate confirmRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);//设置回调方法rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {@Overridepublic void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("执行了confirm方法");if (ack){System.out.printf("接收到消息, 消息ID: %s \n", correlationData==null? null: correlationData.getId());}else {System.out.printf("未接收到消息, 消息ID: %s, cause: %s \n", correlationData==null? null: correlationData.getId(), cause);//相应的业务处理}}});//消息被退回时, 回调方法rabbitTemplate.setMandatory(true);rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {@Overridepublic void returnedMessage(ReturnedMessage returned) {System.out.println("消息退回:"+returned);}});return rabbitTemplate;}

消息确认回调(Confirm Callback)

    public interface ConfirmCallback {void confirm(@Nullable CorrelationData var1, boolean var2, @Nullable String var3);}
  1. CorrelationData correlationData

    • 关联数据,用于标识消息。

    • 可以是 null,表示没有关联数据。

  2. boolean ack

    • 表示消息是否被成功确认。

    • true:消息被 RabbitMQ 服务器成功接收。

    • false:消息未被 RabbitMQ 服务器接收。

  3. String cause

    • 如果 ack 为 false,表示未确认的原因。

    • 如果 ack 为 true,通常为 null

public void setReturnsCallback(ReturnsCallback returnCallback) {Assert.state(this.returnsCallback == null || this.returnsCallback.equals(returnCallback), "Only one ReturnCallback is supported by each RabbitTemplate");this.returnsCallback = returnCallback;}

消息退回回调(Returns Callback)

通过 setMandatory(true) 设置强制消息退回

如果消息无法路由到队列,RabbitMQ 会将消息退回给生产者。

这么设置存在两个问题

1.这种方式设置ConfirmCallback影响所有使用RabbitTemple的方法

2.重复调用接口时会提示错误

2.return退回模式

return 退回模式主要用于将数据、控制权或流程返回到上一个状态、调用者或指定的位置。


重试机制(在消息处理失败时,自动重试

        retry:enabled: true # 开启消费者失败重试initial-interval: 5000ms # 初始失败等待5Smax-attempts: 5 #最大重试次数(包括自身消费的一次)
  @RabbitListener(queues = Constants.RETRY_QUEUE)
//    在 RabbitMQ 中,deliveryTag 是一个用于标识消息的唯一标识符,
//    通常用于确认(acknowledge)或拒绝(reject)消息。
//    当你从 RabbitMQ 队列中消费一条消息时,
//    RabbitMQ 会为该消息分配一个 deliveryTag。
//    这个标签是特定于消息所在的通道(channel)的。public void handlerMessage(Message message) throws UnsupportedEncodingException {long deliveryTag = message.getMessageProperties().getDeliveryTag();System.out.printf("["+Constants.RETRY_QUEUE+"]接收到消息: %s, deliveryTag: %s \n", new String(message.getBody(), "UTF-8"), deliveryTag);
//        int num = 3/0;//        如果消息处理成功,调用 basicAck 确认消息。
//        如果消息处理失败,调用 basicReject 拒绝消息并重新放回队列。System.out.println("业务处理完成");}

 消息处理成功

消息处理失败(因为设定间隔5s重试一次,所以比较慢,最多重试5次结束)


死信队列(DLQ处理无法被正常消费的消息)

设置TTL

 @Bean("ttlQueue2")public Queue ttlQueue2(){return QueueBuilder.durable(Constants.TTL_QUEUE2).ttl(20000).build();}
//还有ttlQueue1和ttlexchange就不网上写了@Bean("ttlBinding")public Binding ttlBinding(@Qualifier("ttlQueue1")Queue queue,@Qualifier("ttlExchange")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}@Bean("ttlBinding2")public Binding ttlBinding2(@Qualifier("ttlQueue2")Queue queue,@Qualifier("ttlExchange")Exchange exchange){return BindingBuilder.bind(queue).to(exchange).with("ttl").noargs();}@Bean("ttlQueue3")public Queue ttlQueue3(){Map<String, Object> map = new HashMap<>();
//向 Map 中添加一个参数 x-message-ttl,表示队列中消息的存活时间(TTL)map.put("x-message-ttl", 20000);//withArguments(map):
//将之前定义的参数(x-message-ttl)应用到队列中。return QueueBuilder.durable(Constants.TTL_QUEUE2).withArguments(map).build();  //设置队列的ttl为20s}
//设置消息的TTL
@RequestMapping("ttl1")public String ttl1(){System.out.println("ttl1...");rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl1 test 30s...", message -> {message.getMessageProperties().setExpiration("30000");  //单位: 毫秒, 过期时间为30sreturn message;});rabbitTemplate.convertAndSend(Constants.TTL_EXCHANGE, "ttl", "ttl1 test 10s...", message -> {message.getMessageProperties().setExpiration("10000");  //单位: 毫秒, 过期时间为10sreturn message;});return "消息发送成功";}

这段代码(个人理解),设置一个队列中的过期时间,创建一个持久化队列 ,进入一个消息,消息存在时间为20s,如果消息不进行消费,则过期,进入到死信队列

20S后

ttlQueue2和ttlQueue3 二者的区别

特性ttlQueue2ttlQueue3
实现方式使用 QueueBuilder 的 ttl() 方法直接设置 TTL。手动创建 Map 对象,并通过 withArguments() 设置参数。
代码简洁性更简洁,适合只设置 TTL 的场景。稍显冗长,但更灵活,适合需要设置多个参数的场景。
灵活性只能设置 TTL,无法同时设置其他参数。可以同时设置多个队列参数(如 TTL、死信队列等)。
底层实现内部会创建一个 Map 并设置 x-message-ttl显式创建 Map 并设置 x-message-ttl

注意:

1.设置队列的TTL(存在该队列中所有消息的TTL)

2.设置消息的TTL 

假如,队列TTL是20s,消息的TTL是10s,那么消息的TTL取小值,也就是10s

如果消息的一个为30S和10s取大值


 死信队列常见的3种情况

1.消息被拒绝:消费者在处理消息时,可能因为消息内容错误,处理逻辑异常等原因拒绝处理该消息,可以理解为程序不能正常运行

2.消息过期

3.队列达到最大长度

 

@Bean("normalQueue")public Queue normalQueue(){return QueueBuilder.durable(Constants.NORMAL_QUEUE).deadLetterExchange(Constants.DL_EXCHANGE).deadLetterRoutingKey("dlx").build();}

事务支持(确保消息的发送和确认在事务中执行)

开启事务

  @Bean("transRabbitTemplate")public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setChannelTransacted(true);  //开启事务return rabbitTemplate;}@Beanpublic RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){return new RabbitTransactionManager(connectionFactory);}

    @Transactional@RequestMapping("/trans")public String trans(){System.out.println("trans test...");transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 1...");
//        int num = 5/0;transRabbitTemplate.convertAndSend("",Constants.TRANS_QUEUE, "trans test 2...");return "消息发送成功";}

不采用事务,第一条成功,第二条失败

采用事务,同时成功或同时失败 

消息分发  决定了消息如何从生产者传递到消费者

消息分发是指消息从生产者传递到消费者的过程。RabbitMQ 提供了多种分发模式,包括直接分发、广播分发、主题分发和头部分发(详见之前的回答)。除此之外,RabbitMQ 还支持以下分发策略:

工作队列模式(Work Queue)

在多个消费者之间分发消息,RabbitMQ 默认采用 轮询(Round-Robin) 的方式将消息均匀地分发给所有消费者。

非公平分发(Non-Fair Dispatch)

默认情况下,RabbitMQ 会将消息均匀地分发给所有消费者,即使某些消费者处理速度较慢。这可能导致某些消费者积压大量消息,而其他消费者空闲。

 限流(Quality of Service, QoS)

限流是指控制消费者从队列中获取消息的速率,以避免消费者过载或消息积压。RabbitMQ 通过 QoS 预取机制(Prefetch Count) 实现限流。

http://www.cadmedia.cn/news/464.html

相关文章:

  • 无锡网站制作启航/淘宝直通车推广怎么收费
  • 保险官方网站/微信推广平台怎么做
  • 有没人做阿里巴巴网站维护的/百度seo推广价格
  • 营销网站建设/百度seo关键词排名
  • wordpress分页功能/安卓系统优化软件
  • 做网站建站/全球搜索引擎市场份额
  • 网站做任务赚钱/网络广告推广平台
  • 做羊水亲子鉴定网站/佛山seo整站优化
  • 惠州网络问政平台/seo西安
  • 荆门哪里有专门做企业网站的/网络营销有哪些例子
  • json做网站的数据库/搜索关键词排名优化服务
  • 佛山家居网站全网营销/优化大师有必要安装吗
  • 室内设计接单网站/seo排名谁教的好
  • 河南省建设厅网站职称网/淄博网络推广公司哪家好
  • 成都h5网站建设/整站优化seo平台
  • 深圳系统开发高端网站建设/优化建议
  • 通辽做网站哪家好/seo建站要求
  • 有哪几个平台做网站/上海推广系统
  • 福建建设管理中心网站/个人如何在百度上做广告
  • 中国十大房地产公司排名/信息流广告优化
  • 塘沽做网站公司/百度浏览器网页版入口
  • 营销型网站建设培训/株洲专业seo优化
  • qq业务网站平台/重庆森林经典台词罐头
  • 上饶网站建设/建网站的软件有哪些
  • 中文网站模板免费下载/怎么在百度上注册店铺
  • 做旅游攻略网站好/策划方案怎么做
  • 深圳商城网站建设报价/西安seo
  • 怎么查网站在哪备案/长沙百度
  • 百度做的网站后台怎么建设/百度竞价推广开户联系方式
  • 保障性住房建设投资中心网站/电脑培训学校在哪里