articleList

10-RabbitMQ 死信队列

2025/03/17 posted in  RabbitMQ
Tags: 

RabbitMQ 死信队列 + TTL 介绍

  • 什么是 TTL

    • time to live 消息存活时间

    • 如果消息在存活时间内未被消费,则会被清除

    • RabbitMQ 支持两种 ttl 设置

      • 单独消息进行配置 ttl
      • 整个队列进行配置 ttl(居多)
  • 什么是 rabbitmq 的死信队列

    • 没有被及时消费的消息存放的队列
  • 什么是 rabbitmq 的死信交换机

    • Dead Letter Exchange(死信交换机,缩写:DLX)当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是 DLX 死信交换机

  • 消息有哪几种情况成为死信

    • 消费者拒收消息(basic.reject/ basic.nack),并且没有重新入队 requeue=false
    • 消息在队列中未被消费,且超过队列或者消息本身的过期时间 TTL(time-to-live)
    • 队列的消息长度达到极限
    • 结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列

  • RabbitMQ 管控台消息 TTL 测试

    • 队列过期时间使用参数,对整个队列消息统一过期

      • x-message-ttl
      • 单位 ms(毫秒)
    • 消息过期时间使用参数(如果队列头部消息未过期,队列中级消息已经过期,已经还在队列里面)

      • expiration
      • 单位 ms(毫秒)
    • 两者都配置的话,时间短的先触发

  • RabbitMQ Web 控制台测试

    • 新建死信交换机(和普通没区别)

    • 新建死信队列(和普通没区别)

    • 死信交换机和死信队列绑定

    • 新建普通队列,设置过期时间,指定死信交换机

    • 测试:直接在 web 控制台往 product_queue 发送消息即可

RabbitMQ 延迟队列介绍和应用场景

  • 什么是延迟队列

    • 一种带有延迟功能的消息队列,Producer 将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟到在当前时间点之后的某一个时间投递到 Consumer 进行消费,该消息即定时消息
  • 使用场景

    • 通过消息触发一些定时任务,比如在某一固定时间点向用户发送提醒消息
    • 用户登录之后 5 分钟给用户做分类推送、用户多少天未登录给用户做召回推送
    • 消息生产和消费有时间窗口要求:比如在天猫电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
  • Cloud 微服务大课训练营里面的应用

    • 优惠券回收
    • 商品库存回收
  • 业界的一些实现方式

    • 定时任务高精度轮询

    • 采用 RocketMQ 自带延迟消息功能

    • RabbitMQ 本身是不支持延迟队列的,怎么办?

      • 结合死信队列的特性,就可以做到延迟消息

综合实战-主题模式+延迟消息-实现新商家规定时间内上架商品检查

  • 背景

    • JD、淘系、天猫、拼多多电商平台,规定新注册的商家,审核通过后需要在【规定时间】内上架商品,否则冻结账号
  • 使用 RabbitMQ 实现

  • 死信交换机和死信队列开发

    /**
    * 死信队列
    */
    public static final String LOCK_MERCHANT_DEAD_QUEUE = "lock_merchant_dead_queue";
    
    /**
    * 死信交换机
    */
    public static final String LOCK_MERCHANT_DEAD_EXCHANGE = "lock_merchant_dead_exchange";
    
    /**
    * 进入死信队列的路由key
    */
    public static final String LOCK_MERCHANT_ROUTING_KEY = "lock_merchant_routing_key";
    
    
    /**
    * 创建死信交换机
    *
    * @return
    */
    @Bean
    public Exchange lockMerchantDeadExchange() {
        return new TopicExchange(LOCK_MERCHANT_DEAD_EXCHANGE, true, false);
    }
    
    /**
    * 创建死信队列
    *
    * @return
    */
    @Bean
    public Queue lockMerchantDeadQueue() {
        return QueueBuilder.durable(LOCK_MERCHANT_DEAD_QUEUE).build();
    }
    
    /**
    * 绑定死信交换机和死信队列
    *
    * @return
    */
    @Bean
    public Binding lockMerchantBinding() {
        return new Binding(LOCK_MERCHANT_DEAD_QUEUE, Binding.DestinationType.QUEUE,
                LOCK_MERCHANT_DEAD_EXCHANGE, LOCK_MERCHANT_ROUTING_KEY, null);
    }
    
  • topic 交换机和队列开发, 绑定死信交换机

    /**
    * 普通队列,绑定的个死信交换机
    */
    public static final String NEW_MERCHANT_QUEUE = "new_merchant_queue";
    
    /**
    * 普通的topic交换机
    */
    public static final String NEW_MERCHANT_EXCHANGE = "new_merchant_exchange";
    
    /**
    * 路由key
    */
    public static final String NEW_MERCHANT_ROUTIING_KEY = "new_merchant_routing_key";
    
    
    /**
    * 创建普通交换机
    *
    * @return
    */
    @Bean
    public Exchange newMerchantExchange() {
        return new TopicExchange(NEW_MERCHANT_EXCHANGE, true, false);
    }
    
    /**
    * 创建普通队列
    *
    * @return
    */
    @Bean
    public Queue newMerchantQueue() {
        Map<String, Object> args = new HashMap<>(3);
        //消息过期后,进入到死信交换机
        args.put("x-dead-letter-exchange", LOCK_MERCHANT_DEAD_EXCHANGE);
    
        //消息过期后,进入到死信交换机的路由key
        args.put("x-dead-letter-routing-key", LOCK_MERCHANT_ROUTING_KEY);
    
        //过期时间,单位毫秒
        args.put("x-message-ttl", 10000);
    
        return QueueBuilder.durable(NEW_MERCHANT_QUEUE).withArguments(args).build();
    }
    
    /**
    * 绑定交换机和队列
    *
    * @return
    */
    @Bean
    public Binding newMerchantBinding() {
        return new Binding(NEW_MERCHANT_QUEUE, Binding.DestinationType.QUEUE,
                NEW_MERCHANT_EXCHANGE, NEW_MERCHANT_ROUTIING_KEY, null);
    }
    
  • 讲解 RabbitMQ 的案例实战-消息生产和消费

    • 消息生产

      • 投递到普通的 topic 交换机
      • 消息过期,进入死信交换机
    • 消息消费

      • 消费者监听死信交换机的队列