RabbitMQ的五种队列
RabbitMQ五种队列分别是 :
1.简单队列
模型 :

P : 消息的生产者
红色 : 队列
C : 消息的消费者
生产者将消息发送到队列,消费者从队列中获取消息。
示例(创建maven工程) :
1.1 导入依赖
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| <dependencies> <!-- https: <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency> <!-- https: <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <!-- https: <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.25</version> </dependency> <!-- https: <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.25</version> </dependency> </dependencies>
|
1.2 编写获取MQ连接的工具类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.self.rabbitmq.utils;
import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost"); factory.setPort(5672); factory.setVirtualHost("host100"); factory.setUsername("admin"); factory.setPassword("admin");
Connection connection = factory.newConnection(); return connection; } }
|
1.3 生产者发送消息到消息队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| package com.self.rabbitmq.simple;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class SimpleProducer {
private static final String QUEUE_NAME = "simple_QUEUE";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
boolean durable = false; boolean exclusive = false; boolean autoDelete = false; channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,null);
String msg = "Hello Simple QUEUE!";
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); System.out.println("------Producer send msg : " + msg);
channel.close(); connection.close();
} }
|
运行main程序,后台输出:

观察RabbitMQ服务器 发现出现了一个队列:
手动从队列中获取消息 (点击队列名称进入队列):

通过代码让消费者获取消息 :
1.4 消费者从队列中获取消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package com.self.rabbitmq.simple;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException;
public class SimpleConsumer {
private static final String QUEUE_NAME = "simple_QUEUE";
public static void main(String[] args) throws Exception{
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "UTF-8"); System.out.println("[x] consumer '" + message + "'"); } };
channel.basicConsume(QUEUE_NAME,true,consumer); } }
|

1.5 上述方法的解析
1
| channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,Map<String,Object> arguments);
|
QUEUE_NAME : 队列名;
durable : 是否持久化,就是说在服务器重启时,能否存活;
exclusive : 是否是当前连接的专用队列;
autoDelete : 没有消费者时,是否自动删除;
arguments : 参数;
1
| channel.basicPublish(String exchange,QUEUE_NAME,basicProperties,byte[] body);
|
exchange : 交换机
QUEUE_NAME : 队列名
basicProperties : 属性
byte[] body : 消息;
1 2 3 4 5 6 7 8
| DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { } };
|
1 2 3
|
channel.basicConsume(QUEUE_NAME,true,consumer);
|
1.6 简单队列的不足
耦合性高,生产消费一一对应(如果有多个消费者想都消费这个消息,就不行了),队列名称变更时需要同时更改。
2. 工作模式
模型 :
一个生产者、两个消费者
一个消息只能被一个消费者获取。
分为两种方式 : Round-robin(轮询分发)和Fair dispatch(公平分发)
2.1 轮询分发
生产者 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.self.rabbitmq.work;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class WorkProducer {
private static final String QUEUE_NAME = "work_QUEUE";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
for (int i = 0; i < 50; i++) {
String msg = "hello :" + i; System.out.println("[WQ] producer : " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); try { Thread.sleep(i*20); } catch (InterruptedException e) { e.printStackTrace(); } }
channel.close(); connection.close();
} }
|
消费者1 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
| package com.self.rabbitmq.work;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class WorkConsumer1 {
private static final String QUEUE_NAME = "work_QUEUE";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[1] Consumer get : " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); } } };
boolean autoAck = true; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
消费者 2 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.self.rabbitmq.work;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class WorkConsumer2 {
private static final String QUEUE_NAME = "work_QUEUE";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8"); System.out.println("[2] consumer get : " + message);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] done"); } } };
boolean autoAck = true; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
结果截图如下 :


结论 :
生产者将消息发送完成后,两个消费者获取的消息数是一样的,但是消息内容不一样;
这是一种轮询分发策略,就是不管谁忙着或谁清闲,都不会多给一个消息,任务消息是你一个我一个的发送;
2.2 公平分发
使用公平分发的两个条件 :
- 使用chanel.basicQos(perfetch = 1) : 限制发送给同一个消费者不得超过一条消息,每次只处理一条消息;
- 消费者方要关闭自动应答,改为手动应答;
- chanel.basicQos(1);
- chanel.basicAck(envelope.getDeliveryTag,false);
- boolean autoAck = false;
生产者 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60
| package com.self.rabbitmq.workfair;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class WorkProducer {
private static final String QUEUE_NAME = "work_QUEUE";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
int perfetchCount = 1; channel.basicQos(perfetchCount);
for (int i = 0; i < 50; i++) {
String msg = "hello :" + i; System.out.println("[WQ] producer : " + msg);
channel.basicPublish("",QUEUE_NAME,null,msg.getBytes()); try { Thread.sleep(i*20); } catch (InterruptedException e) { e.printStackTrace(); } }
channel.close(); connection.close();
} }
|
消费者1 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| package com.self.rabbitmq.workfair;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class WorkConsumer1 {
private static final String QUEUE_NAME = "work_QUEUE";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, "utf-8"); System.out.println("[1] Consumer get : " + message); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false); } } };
boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
消费者2:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package com.self.rabbitmq.workfair;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class WorkConsumer2 {
private static final String QUEUE_NAME = "work_QUEUE";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body,"UTF-8"); System.out.println("[2] consumer get : " + message);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] done");
channel.basicAck(envelope.getDeliveryTag(),false); } } };
boolean autoAck = false; channel.basicConsume(QUEUE_NAME,autoAck,consumer); } }
|
结果截图 :



结论 :
生产者将消息发送完成后,消费者1获取的消息数量较少,消费者2获取的消息数量较多;
简单来说,就是能者多劳,公平分发;
3.订阅模式
我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多个消费者消费,这该怎么办?这时候我们就得用消息中的订阅模型 :

解读 :
- 1个生产者,多个消费者;
- 每一个消费者都有自己的一个队列;
- 生产者没有将消息直接发送到队列,而是发送到了交换机;
- 每一个队列都要与交换机进行绑定;
- 生产者发送消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的;
生产者 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.self.rabbitmq.subscribe;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class SubscribeProducer {
private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");
String msg = "hello subscribe";
channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes()); System.out.println("subscribe : " + msg);
channel.close(); connection.close();
} }
|
输出结果 :

观察服务器可以发现存在一个交换机 :

注意 : 交换机并没有存储数据的能力,所以在没有队列和交换机绑定时,交换机接收到的消息丢失。
消费者1 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package com.self.rabbitmq.subscribe;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class SubscribeConsumer1 {
private static final String QUEUE_NAME = "subscribe_QUEUE_1"; private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8"); System.out.println("[1] subscribeConsumer : " + msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } };
channel.basicConsume(QUEUE_NAME,false,consumer);
} }
|
消费者2 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.self.rabbitmq.subscribe;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class SubscribeConsumer2 {
private static final String QUEUE_NAME = "subscribe_QUEUE_2"; private static final String EXCHANGE_NAME = "exchange_fanout";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[2] subscribeConsumer : " + msg);
try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } };
channel.basicConsume(QUEUE_NAME,false,consumer); }
}
|
结果截图 :



查看RabbitMQ服务器页面可知j交换机绑定了两个队列 :

结论 :
将消息发送到交换机,队列与交换机进行绑定,并且每一个消费者对应一个队列,所以两个消费者都可以获取生产者发送给交换机的消息。
4. 路由模式
模型 :

解读 :
交换机(exchange) 和队列都会拥有一个路由键(routingKey),如果交换机的路由键和队列的路由键相匹配时,交换机才会将消息放入队列中。
(error或info等就是routingKey)
生产者 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.self.rabbitmq.routing;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class RoutingProducer {
private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"direct");
String msg = "hello Routing";
String routingKey = "error"; channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());
System.out.println("RoutingProducer : " + msg);
channel.close(); connection.close(); } }
|
消费者1:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.self.rabbitmq.routing;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class RoutingConsumer1 {
private static final String EXCHANGE_NAME = "exchange_direct"; private static final String QUEUE_NAME = "routing_QUEUE_1";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[1] routingConsumer : " + msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } };
channel.basicConsume(QUEUE_NAME,false,consumer);
} }
|
消费者2 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
| package com.self.rabbitmq.routing;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class RoutingConsumer2 {
private static final String QUEUE_NAME = "routing_QUEUE_2"; private static final String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, TimeoutException {
Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info"); channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"utf-8"); System.out.println("[2] RoutingConsumer : " + msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } };
channel.basicConsume(QUEUE_NAME,false,consumer);
} }
|
结果截图 :



由输出结果可知,当路由键设置成error时,两个消费者都可以获取到消息;
而当将生产者中的routingKey更改为info时,消费者1获取不到消息,只有将队列中的routingKey绑定有info发消费者2才能获取到消息。
1
| String routingKey = "info";
|



结论:路由模式中,交换机的routingKey和队列的routingKey是存在有对应关系的,可以在队列中绑定多个routingKey,而只有交换机和队列中的routingKey相互匹配才能获取消息。
5. 主题模式
模型 :

将路由键与某模式进行匹配
生产者 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package com.self.rabbitmq.topic;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class TopicProducer {
private static final String EXCHANGE_NAME = "topic_EXCHANGE";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,"topic");
String msg = "商品...";
String routingKey = "goods.add";
channel.basicPublish(EXCHANGE_NAME, routingKey,null,msg.getBytes()); System.out.println("TopicProducer : " + msg);
channel.close(); connection.close(); } }
|
消费者1 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.self.rabbitmq.topic;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class TopicConsumer1 {
private static final String QUEUE_NAME = "topic_QUEUE_1"; private static final String EXCHANGE_NAME = "topic_EXCHANGE";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[1] topicConsumer : " + msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[1] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } };
channel.basicConsume(QUEUE_NAME,false,consumer);
} }
|
消费者 2 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| package com.self.rabbitmq.topic;
import com.rabbitmq.client.*; import com.self.rabbitmq.utils.ConnectionUtil;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class TopicConsumer2 {
private static final String QUEUE_NAME = "topic_QUEUE_2"; private static final String EXCHANGE_NAME = "topic_EXCHANGE";
public static void main(String[] args) throws IOException, TimeoutException { Connection connection = ConnectionUtil.getConnection(); final Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");
channel.basicQos(1);
DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body,"UTF-8"); System.out.println("[2] topicConsumer : " + msg);
try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println("[2] done"); channel.basicAck(envelope.getDeliveryTag(),false); } } };
channel.basicConsume(QUEUE_NAME,false,consumer);
} }
|
结果截图 :



由以上可以知道,当交换机中的routingKey = “goods.add”时,消费者1的队列中的routingKey = goods.add和消费者2的队列中的routingKey = goods.# 都是可以相互匹配的,所以两个消费者均可获取消息。
然后将生产者中的routingKey改成 routingKey = “good.delete时,消费者1的队列中的routingKey将没办法与交换机中的routingKey相匹配,所以消费者1没法获取消息,结果如下:


