目录

Rabbitmq之死信队列

李羽秋
李羽秋 2022年02月13日  ·  阅读 863

Rabbitmq之死信队列

1.什么是死信队列?

一般来说,producer将消息投递到queue中,consumer将从queue取出进行消费。但是由于某种原因,consumer无法对queue中的消息进行消费,并且不进行后续的处理,就变成了死信,所有的死信都会放到死信队列当中。“死信”消息会被RabbitMq进行特殊处理。如果配置了死信队列的消息,那么该消息将会被丢尽死信队列当中,如果没有配置,则该消息将会被丢弃。

2.死信的来源

  • 消息被拒绝

  • 消息TTL过期

  • 队列达到最大长度(队列满了,无法再添加到mq中)

    image-20220213174018003

3. 模仿消息TTL过期

生产者
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        //设置消息的TTL事时间
        AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
        //该消息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message ="info" +i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+message);
        }
    }

}
-------------------------
消费者1
public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE ="dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        //死信队列绑定死信交换机与routingKey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");

        //正常队列绑定死信队列消息
        Map<String,Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数key是固定值
        params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信routing-key 参数是固定值
        params.put("x-dead-letter-routing-key","lisi");

        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue,false,false,false,params);
        channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");

        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag,delivery) ->{
          String message =new String(delivery.getBody(),"UTF-8");
          System.out.println("Consumer01接收到消息"+message);
        };
        channel.basicConsume(normalQueue,true,deliverCallback,cancel->{});
    }

}
-------------------------
消费者2

public class Consumer02 {
    private  static  final String DEAD_EXCHANGE ="dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收死信队列消息....");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
          String message = new String(delivery.getBody(),"UTF-8");
            System.out.println("Consumer02接收死信队列的消息"+message);
        };
        channel.basicConsume(deadQueue,true,deliverCallback,cancel->{});
    }

}

下图可以看到正常队列的十条消息未载规定时间内被消费变成了死信队列

image-20220213173621896

下面启动消费者2来消费死信队列,我们可以看到死信队列里面没有可消费的消息

image-20220213173939167

4.模仿队列达到最大长度

生产者
--------------------
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        //该消息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message ="info" +i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+message);
        }
    }

}
-------------------
消费者1
public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE ="dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");

        //声明死信队列
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        //死信队列绑定死信交换机与routingKey
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");




        //正常队列绑定死信队列消息
        Map<String,Object> params = new HashMap<>();
        //正常队列设置死信交换机 参数key是固定值
        params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信routing-key 参数是固定值
        params.put("x-dead-letter-routing-key","lisi");
        //设置正常队列长度
        params.put("x-max-length",6);

        String normalQueue = "normal-queue";
        channel.queueDeclare(normalQueue,false,false,false,params);
        channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");

        System.out.println("等待接收消息.....");
        DeliverCallback deliverCallback = (consumerTag,delivery) ->{
          String message =new String(delivery.getBody(),"UTF-8");
          System.out.println("Consumer01接收到消息"+message);
        };
        channel.basicConsume(normalQueue,true,deliverCallback,cancel->{});
    }

}
------------------
消费者2

public class Consumer02 {
    private  static  final String DEAD_EXCHANGE ="dead_exchange";
    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收死信队列消息....");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
          String message = new String(delivery.getBody(),"UTF-8");
            System.out.println("Consumer02接收死信队列的消息"+message);
        };
        channel.basicConsume(deadQueue,true,deliverCallback,cancel->{});
    }

}

我们可以看到当消息数量达到正常队列长度时其余消息将会变成死信

image-20220213174847624

这时我们启动消费者2消费死信队列,可以看到死信队列消息已被消费

image-20220213175009581

5.模仿消息被拒绝

生产者
public class Producer {
    private static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        //该消息是用作演示队列个数限制
        for (int i = 1; i < 11; i++) {
            String message ="info" +i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null,message.getBytes(StandardCharsets.UTF_8));
            System.out.println("生产者发送消息:"+message);
        }
    }

}
----------------------
消费者1
public class Consumer01 {
    //普通交换机名称
    private static final String NORMAL_EXCHANGE ="normal_exchange";
    //死信交换机名称
    private static final String DEAD_EXCHANGE = "dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        //声明死信和普通交换机 类型为direct
        channel.exchangeDeclare(NORMAL_EXCHANGE,"direct");
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");

        //声明死信队列
        String deadQueue ="dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");

        //正常队列绑定死信队列的消息
        Map<String,Object> params = new HashMap<>();
        //正常队列设置死信routing-key 参数key是固定值
        params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信routing-key 参数key是固定值
        params.put("x-dead-letter-routing-key","lisi");
        String normalQueue = "normal_queue";
        channel.queueDeclare(normalQueue,false,false,false,params);
        channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");

        System.out.println("等待接收消息......");
        DeliverCallback deliverCallback = ((consumerTag, message) -> {
           String mes = new String(message.getBody(),"UTF-8");
           if (mes.equals("info5")){
               System.out.println("Consumer01接收消息"+mes+"并拒绝签收该消息");
               channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
           }else {
               System.out.println("Consumer01接收到消息"+mes);
               channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
           }
        });
        channel.basicConsume(normalQueue,false,deliverCallback,consumerTag -> {});
    }

}
------------------
消费者2

public class Consumer02 {
    private  static  final String DEAD_EXCHANGE ="dead_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(DEAD_EXCHANGE,"direct");
        String deadQueue = "dead-queue";
        channel.queueDeclare(deadQueue,false,false,false,null);
        channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
        System.out.println("等待接收死信队列消息....");
        DeliverCallback deliverCallback = (consumerTag,delivery)->{
          String message = new String(delivery.getBody(),"UTF-8");
            System.out.println("Consumer02接收死信队列的消息"+message);
        };
        channel.basicConsume(deadQueue,true,deliverCallback,cancel->{});
    }

}

我们可以看到有一个消息被拒绝了,便加入了死信队列

image-20220213180853458

参考:https://blog.csdn.net/qq_43644198/article/details/109765486

分类:
标签: