rabbitmq之fanout、direct、topic三种模式
消费者2
public class ReceiveLogsDirect02 {
private static final String EXCHANGE_NAME ="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String queueName = "console";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"info");
channel.queueBind(queueName,EXCHANGE_NAME,"warning");
System.out.println("等待接收消息....");
DeliverCallback deliverCallback = (consumerTag,delivery) ->{
String message = new String(delivery.getBody(),"UTF-8");
System.out.println("接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{ });
}
}
生产者
public class ReceiveLogsDirect01 {
private static final String EXCHANGE_NAME ="direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String queueName ="disk";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"error");
System.out.println("等待接收消息...");
DeliverCallback deliverCallback =(consumerTag,delivery) ->{
String message = new String(delivery.getBody(),"UTF-8");
message = "接收绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message;
System.out.println(message);
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag ->{});
}
}
消费者1结果:
![image-20220211144008541](https://lyq-1308494022.cos.ap-nanjing.myqcloud.com/img/image-20220211144008541.png)
消费者2结果:
![image-20220211144030111](https://lyq-1308494022.cos.ap-nanjing.myqcloud.com/img/image-20220211144030111.png)
## 4. Fanout 模式
### 4.1 简介
Fanout Exchange -不处理路由键。你只需要将简单队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机
绑定的所有队列上。很像子网传播,每台子网的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。
- 可以理解为路由表的模式
- 这种模式不需要RouteKey
- 这种模式需要提前将Exchange与Queue进行绑定,一个Exchange可以绑定多个Queue,一个Queue可以同多个Exchange进行绑定。
### 4.2 demo
消费者1
public class ReceiveLogs01 {
private static final String EXCHANGE_NAME ="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的exchange其中routingKey为空字符串
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待接收消息,把接收到的消息打印到屏幕上");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message =new String(delivery.getBody(),"UTF-8");
System.out.println("消费者1控制台打印接收的消息"+message);
};
channel.basicConsume(queueName,deliverCallback,consumerTag->{});
}
}
消费者2
public class ReceiveLogs02 {
private static final String EXCHANGE_NAME ="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String queueName = channel.queueDeclare().getQueue();
//把该临时队列绑定我们的exchange其中routingKey为空字符串
channel.queueBind(queueName,EXCHANGE_NAME,"");
System.out.println("等待接收消息,把接收到的消息打印到屏幕上");
DeliverCallback deliverCallback = (consumerTag,delivery)->{
String message =new String(delivery.getBody(),"UTF-8");
System.out.println("消费者2控制台打印接收的消息"+message);
};
channel.basicConsume(queueName,deliverCallback,consumerTag->{});
}
}
生产者
public class EmitLog {
private static final String EXCHANGE_NAME ="logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
Scanner scanner = new Scanner(System.in);
System.out.println("请输入消息");
while (scanner.hasNext()){
String message = scanner.nextLine();
channel.basicPublish(EXCHANGE_NAME,"",null,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发出消息"+message);
}
}
}
消费者1:
![image-20220211154011636](https://lyq-1308494022.cos.ap-nanjing.myqcloud.com/img/image-20220211154011636.png)
消费者2:
![image-20220211154029939](https://lyq-1308494022.cos.ap-nanjing.myqcloud.com/img/image-20220211154029939.png)
生产者:
![image-20220211154048045](https://lyq-1308494022.cos.ap-nanjing.myqcloud.com/img/image-20220211154048045.png)
## 5. Topic 模式
### 5.1 简介
Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”
任何发送到Topic Exchange的消息都会被转发到所有关心RouteKey中指定话题的Queue上。
- 这种模式较为复杂,简单来说,就是每个队列都有其关心的主题,所有的消息都带有一个“标题”(RouteKey),Exchange会将消息转发到所有关注主题能与RouteKey模糊匹配的队列。
- 这种模式需要RouteKey,也许要提前绑定Exchange与Queue。
- 在进行绑定时,要提供一个该队列关心的主题,如“#.log.#”表示该队列关心所有涉及log的消息(一个RouteKey为”MQ.log.error”的消息会被转发到该队列)。
- “#”表示0个或若干个关键字,“*”表示一个关键字。如“log.*”能与“log.warn”匹配,无法与“log.warn.timeout”匹配;但是“log.#”能与上述两者匹配。
### 5.2 demo
消费者1
public class ReceiveLosTopic01 {
private static final String EXCHANGE_NAME ="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声明Q1队列与绑定关系
String queueName ="Q1";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.orange.*");
System.out.println("等待接收消息...");
DeliverCallback deliverCallback =(consumerTag,delivery)->{
String message = new String(delivery.getBody(),"UTF-8");
System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag->{ });
}
}
消费者2
public class ReceiveLosTopic02 {
private static final String EXCHANGE_NAME ="topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
//声明Q2队列与绑定关系
String queueName ="Q2";
channel.queueDeclare(queueName,false,false,false,null);
channel.queueBind(queueName,EXCHANGE_NAME,"*.*.rabbit");
channel.queueBind(queueName,EXCHANGE_NAME,"lazy.#");
System.out.println("等待接收消息...");
DeliverCallback deliverCallback =(consumerTag,delivery)->{
String message = new String(delivery.getBody(),"UTF-8");
System.out.println("接收队列:"+queueName+"绑定键:"+delivery.getEnvelope().getRoutingKey()+",消息:"+message);
};
channel.basicConsume(queueName,true,deliverCallback,consumerTag->{ });
}
}
生产者
public class EmitLogTopic {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
Map<String, String> map = new HashMap<>();
/***
Q1 -->绑定的是中间带orange带3个单词的字符串(*.orange.*)
Q2 -->绑定的是最后一个单词是rabbit的三个单词(*.*.rabbit),第一个单词是lazy的多个单词(lazy.#)
*/
map.put("quick.orange.rabbit", "被队列Q1Q2接收到");
map.put("lazy.orange.elephant", "被队列Q1Q2接收到");
map.put("quick.orange.fox", "被队列Q1接收到");
map.put("lazy.brown.fox", "被队列Q2接收到");
map.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次");
map.put("quick.brown.fox", "不匹配任何绑定不会被任何队列接收会被丢弃");
map.put("quick.orange.male.rabbit", "四个单词不匹配任何绑定会被丢弃");
map.put("lazy.orange.male.rabbit", "四个单词但匹配Q2");
for (Map.Entry<String, String> bindingKeyEntry : map.entrySet()) {
String bindingKey = bindingKeyEntry.getKey();
String message = bindingKeyEntry.getValue();
channel.basicPublish(EXCHANGE_NAME, bindingKey, null,
message.getBytes("UTF-8"));
System.out.println("生产者发出消息" + message);
}
}
}
消费者1:
![image-20220211161706521](https://lyq-1308494022.cos.ap-nanjing.myqcloud.com/img/image-20220211161706521.png)
消费者2:
![image-20220211161719020](https://lyq-1308494022.cos.ap-nanjing.myqcloud.com/img/image-20220211161719020.png)
生产者
![image-20220211161733742](https://lyq-1308494022.cos.ap-nanjing.myqcloud.com/img/image-20220211161733742.png)
参考:https://www.cnblogs.com/shenyixin/p/9084249.html