RabbitMQ的五种队列

RabbitMQ五种队列分别是 :

  • 简单队列
  • 工作模式
  • 订阅模式
  • 路由模式
  • 主题模式

1.简单队列

模型 :

P : 消息的生产者

红色 : 队列

C : 消息的消费者

生产者将消息发送到队列,消费者从队列中获取消息。

示例(创建maven工程) :

1.1 导入依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/log4j/log4j -->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-log4j12 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.25</version>
</dependency>
</dependencies>

1.2 编写获取MQ连接的工具类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
package com.self.rabbitmq.utils;

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

public class ConnectionUtil {

/**
* 获取MQ的连接
* @return
*/
public static Connection getConnection() throws IOException, TimeoutException {

//定义一个连接工厂
ConnectionFactory factory = new ConnectionFactory();

//设置服务器地址,AMQP端口号,选择vhost,用户名密码
factory.setHost("localhost");
factory.setPort(5672);
factory.setVirtualHost("host100");
factory.setUsername("admin");
factory.setPassword("admin");

Connection connection = factory.newConnection();
return connection;
}
}

1.3 生产者发送消息到消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.self.rabbitmq.simple;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 简单队列的生产者
*/
public class SimpleProducer {

private static final String QUEUE_NAME = "simple_QUEUE";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();

//从连接中创建通道
Channel channel = connection.createChannel();

//创建队列,生产者发送消息到队列,消费者从队列中获取消息;
boolean durable = false;
boolean exclusive = false;
boolean autoDelete = false;
channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,null);

String msg = "Hello Simple QUEUE!";

channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
System.out.println("------Producer send msg : " + msg);

channel.close();
connection.close();


}
}

运行main程序,后台输出:

观察RabbitMQ服务器 发现出现了一个队列:

手动从队列中获取消息 (点击队列名称进入队列):

通过代码让消费者获取消息 :

1.4 消费者从队列中获取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package com.self.rabbitmq.simple;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;

/**
* 简单队列的消费者
*/
public class SimpleConsumer {

private static final String QUEUE_NAME = "simple_QUEUE";

public static void main(String[] args) throws Exception{


//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();

//创建队列,这句在生产者中已经写,在这里其实不用写
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//声明consumer对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
//handleDelivery的作用 : 一旦有消息进入队列,就会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("[x] consumer '" + message + "'");
}
};

//监听队列
//true : autoAck 是否自动应答
channel.basicConsume(QUEUE_NAME,true,consumer);
}
}

1.5 上述方法的解析

1
channel.queueDeclare(QUEUE_NAME,durable,exclusive,autoDelete,Map<String,Object> arguments);

QUEUE_NAME : 队列名;

durable : 是否持久化,就是说在服务器重启时,能否存活;

exclusive : 是否是当前连接的专用队列;

autoDelete : 没有消费者时,是否自动删除;

arguments : 参数;

1
channel.basicPublish(String exchange,QUEUE_NAME,basicProperties,byte[] body);

exchange : 交换机

QUEUE_NAME : 队列名

basicProperties : 属性

byte[] body : 消息;

1
2
3
4
5
6
7
8
//声明consumer对象
DefaultConsumer consumer = new DefaultConsumer(channel) {
//handleDelivery的作用 : 一旦有消息进入队列,就会触发这个方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
//hadleDelivery中的body就是队列中的消息;
}
};
1
2
3
//监听队列
//true : autoAck 是否自动应答
channel.basicConsume(QUEUE_NAME,true,consumer);

1.6 简单队列的不足

耦合性高,生产消费一一对应(如果有多个消费者想都消费这个消息,就不行了),队列名称变更时需要同时更改。

2. 工作模式

模型 :

一个生产者、两个消费者

一个消息只能被一个消费者获取。

分为两种方式 : Round-robin(轮询分发)和Fair dispatch(公平分发)

2.1 轮询分发

生产者 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.self.rabbitmq.work;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 工作模式的生产者
*/
public class WorkProducer {

private static final String QUEUE_NAME = "work_QUEUE";

/** |---C1
* P---------QUEUE-----|
* |---C2
* @param args
* @throws IOException
* @throws TimeoutException
*/
public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();

//创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

for (int i = 0; i < 50; i++) {

String msg = "hello :" + i;
System.out.println("[WQ] producer : " + msg);

channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
try {
Thread.sleep(i*20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

channel.close();
connection.close();

}
}

消费者1 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
package com.self.rabbitmq.work;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 工作模式的消费者1
*/
public class WorkConsumer1 {

private static final String QUEUE_NAME = "work_QUEUE";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();

//创建通道
Channel channel = connection.createChannel();

//创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//队列中一旦有消息,就会触发此方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("[1] Consumer get : " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
}
}
};

boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}

消费者 2 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
package com.self.rabbitmq.work;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 工作模式的消费者2
*/
public class WorkConsumer2 {

private static final String QUEUE_NAME = "work_QUEUE";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();

//创建通道
Channel channel = connection.createChannel();

//创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//队列中有消息时,触发该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body,"UTF-8");
System.out.println("[2] consumer get : " + message);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
}
}
};

boolean autoAck = true;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}

结果截图如下 :

结论 :

生产者将消息发送完成后,两个消费者获取的消息数是一样的,但是消息内容不一样;

这是一种轮询分发策略,就是不管谁忙着或谁清闲,都不会多给一个消息,任务消息是你一个我一个的发送;

2.2 公平分发

使用公平分发的两个条件 :

  • 使用chanel.basicQos(perfetch = 1) : 限制发送给同一个消费者不得超过一条消息,每次只处理一条消息;
  • 消费者方要关闭自动应答,改为手动应答;
    • chanel.basicQos(1);
    • chanel.basicAck(envelope.getDeliveryTag,false);
    • boolean autoAck = false;

生产者 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
package com.self.rabbitmq.workfair;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 工作模式的生产者
*/
public class WorkProducer {

private static final String QUEUE_NAME = "work_QUEUE";

/** |---C1
* P---------QUEUE-----|
* |---C2
* @param args
* @throws IOException
* @throws TimeoutException
*/
public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();

//创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

/**
* 每一个消费者发送确认消息之前,消费队列不发送下一个消息给消费者,一次只处理一个消息
*
* 限制发送给同一个消费者不得超过一条消息
*
*/
int perfetchCount = 1;
channel.basicQos(perfetchCount);

for (int i = 0; i < 50; i++) {

String msg = "hello :" + i;
System.out.println("[WQ] producer : " + msg);

channel.basicPublish("",QUEUE_NAME,null,msg.getBytes());
try {
Thread.sleep(i*20);
} catch (InterruptedException e) {
e.printStackTrace();
}
}

channel.close();
connection.close();

}
}

消费者1 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
package com.self.rabbitmq.workfair;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 工作模式的消费者1
*/
public class WorkConsumer1 {

private static final String QUEUE_NAME = "work_QUEUE";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();

//创建通道
final Channel channel = connection.createChannel();

//创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

channel.basicQos(1);//保证一次只分发一个

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//队列中一旦有消息,就会触发此方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "utf-8");
System.out.println("[1] Consumer get : " + message);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");

channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}

消费者2:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.self.rabbitmq.workfair;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 工作模式的消费者2
*/
public class WorkConsumer2 {

private static final String QUEUE_NAME = "work_QUEUE";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();

//创建通道
final Channel channel = connection.createChannel();

//创建队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

channel.basicQos(1);//保证一次只处理一个消息

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//队列中有消息时,触发该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String message = new String(body,"UTF-8");
System.out.println("[2] consumer get : " + message);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");

//手动应答
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};

boolean autoAck = false;
channel.basicConsume(QUEUE_NAME,autoAck,consumer);
}
}

结果截图 :

结论 :

生产者将消息发送完成后,消费者1获取的消息数量较少,消费者2获取的消息数量较多;

简单来说,就是能者多劳,公平分发;

3.订阅模式

我们之前学习的都是一个消息只能被一个消费者消费,那么如果我想发一个消息 能被多个消费者消费,这该怎么办?这时候我们就得用消息中的订阅模型 :

解读 :

  1. 1个生产者,多个消费者;
  2. 每一个消费者都有自己的一个队列;
  3. 生产者没有将消息直接发送到队列,而是发送到了交换机;
  4. 每一个队列都要与交换机进行绑定;
  5. 生产者发送消息,经过交换机,到达队列,实现一个消息被多个消费者获取的目的;

生产者 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.self.rabbitmq.subscribe;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 订阅模式生产者
*/
public class SubscribeProducer {

private static final String EXCHANGE_NAME = "exchange_fanout";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();

//声明交换机 fanout表示分发
channel.exchangeDeclare(EXCHANGE_NAME,"fanout");

//发送消息
String msg = "hello subscribe";

channel.basicPublish(EXCHANGE_NAME,"",null,msg.getBytes());
System.out.println("subscribe : " + msg);

channel.close();
connection.close();


}
}

输出结果 :

观察服务器可以发现存在一个交换机 :

注意 : 交换机并没有存储数据的能力,所以在没有队列和交换机绑定时,交换机接收到的消息丢失。

消费者1 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.self.rabbitmq.subscribe;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 订阅模式的消费者1
*/
public class SubscribeConsumer1 {

private static final String QUEUE_NAME = "subscribe_QUEUE_1";
private static final String EXCHANGE_NAME = "exchange_fanout";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();

//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//将队列跟交换机进行绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

//保证消费者一次只能获取一个消息
channel.basicQos(1);

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
//队列上有消息就触发此方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

String msg = new String(body,"utf-8");
System.out.println("[1] subscribeConsumer : " + msg);

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};

//监听队列
channel.basicConsume(QUEUE_NAME,false,consumer);



}
}

消费者2 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.self.rabbitmq.subscribe;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 订阅模式的消费者2
*/
public class SubscribeConsumer2 {

private static final String QUEUE_NAME = "subscribe_QUEUE_2";
private static final String EXCHANGE_NAME = "exchange_fanout";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();

//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//将队列与交换机进行绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"");

//保证消费者一次只能获取一条消息
channel.basicQos(1);

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("[2] subscribeConsumer : " + msg);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};

//监听队列
channel.basicConsume(QUEUE_NAME,false,consumer);
}

}

结果截图 :

查看RabbitMQ服务器页面可知j交换机绑定了两个队列 :

结论 :

将消息发送到交换机,队列与交换机进行绑定,并且每一个消费者对应一个队列,所以两个消费者都可以获取生产者发送给交换机的消息。

4. 路由模式

模型 :

解读 :

交换机(exchange) 和队列都会拥有一个路由键(routingKey),如果交换机的路由键和队列的路由键相匹配时,交换机才会将消息放入队列中。

(error或info等就是routingKey)

生产者 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package com.self.rabbitmq.routing;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 路由模型的生产者
*/
public class RoutingProducer {

private static final String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();

//创建通道
Channel channel = connection.createChannel();

//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"direct");

String msg = "hello Routing";

String routingKey = "error";
channel.basicPublish(EXCHANGE_NAME,routingKey,null,msg.getBytes());

System.out.println("RoutingProducer : " + msg);

channel.close();
connection.close();
}
}

消费者1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.self.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 路由模式的消费者1
*/
public class RoutingConsumer1 {

private static final String EXCHANGE_NAME = "exchange_direct";
private static final String QUEUE_NAME = "routing_QUEUE_1";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();

//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");

channel.basicQos(1);

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("[1] routingConsumer : " + msg);

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};

//监听队列
channel.basicConsume(QUEUE_NAME,false,consumer);



}
}

消费者2 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
package com.self.rabbitmq.routing;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 路由模式的消费者2
*/
public class RoutingConsumer2 {

private static final String QUEUE_NAME = "routing_QUEUE_2";
private static final String EXCHANGE_NAME = "exchange_direct";

public static void main(String[] args) throws IOException, TimeoutException {

//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();

//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//将队列与交换机进行绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"error");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"info");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"warning");

//消费者每一次只能获取一条信息
channel.basicQos(1);

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel) {

@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"utf-8");
System.out.println("[2] RoutingConsumer : " + msg);

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
//手动回执
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};

//监听队列
channel.basicConsume(QUEUE_NAME,false,consumer);

}
}

结果截图 :

由输出结果可知,当路由键设置成error时,两个消费者都可以获取到消息;

而当将生产者中的routingKey更改为info时,消费者1获取不到消息,只有将队列中的routingKey绑定有info发消费者2才能获取到消息。

1
String routingKey = "info";

结论:路由模式中,交换机的routingKey和队列的routingKey是存在有对应关系的,可以在队列中绑定多个routingKey,而只有交换机和队列中的routingKey相互匹配才能获取消息。

5. 主题模式

模型 :

将路由键与某模式进行匹配

  • #表示匹配一个或多个
  • *表示匹配一个

生产者 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.self.rabbitmq.topic;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 主题模式的生产者
*/
public class TopicProducer {

private static final String EXCHANGE_NAME = "topic_EXCHANGE";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
Channel channel = connection.createChannel();

//声明交换机
channel.exchangeDeclare(EXCHANGE_NAME,"topic");

String msg = "商品...";

String routingKey = "goods.add";

//发送消息给交换机
channel.basicPublish(EXCHANGE_NAME, routingKey,null,msg.getBytes());
System.out.println("TopicProducer : " + msg);

channel.close();
connection.close();
}
}

消费者1 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.self.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 主题模式的消费者1
*/
public class TopicConsumer1 {

private static final String QUEUE_NAME = "topic_QUEUE_1";
private static final String EXCHANGE_NAME = "topic_EXCHANGE";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();

//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//将队列与交换机进行绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.add");

channel.basicQos(1);

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("[1] topicConsumer : " + msg);

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[1] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};

//监听队列
channel.basicConsume(QUEUE_NAME,false,consumer);



}
}

消费者 2 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.self.rabbitmq.topic;

import com.rabbitmq.client.*;
import com.self.rabbitmq.utils.ConnectionUtil;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
* 主题模式的消费者1
*/
public class TopicConsumer2 {

private static final String QUEUE_NAME = "topic_QUEUE_2";
private static final String EXCHANGE_NAME = "topic_EXCHANGE";

public static void main(String[] args) throws IOException, TimeoutException {
//获取连接
Connection connection = ConnectionUtil.getConnection();
//创建通道
final Channel channel = connection.createChannel();

//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);

//将队列与交换机进行绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"goods.#");

channel.basicQos(1);

//定义一个消费者
DefaultConsumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg = new String(body,"UTF-8");
System.out.println("[2] topicConsumer : " + msg);

try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("[2] done");
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};

//监听队列
channel.basicConsume(QUEUE_NAME,false,consumer);



}
}

结果截图 :

由以上可以知道,当交换机中的routingKey = “goods.add”时,消费者1的队列中的routingKey = goods.add和消费者2的队列中的routingKey = goods.# 都是可以相互匹配的,所以两个消费者均可获取消息。

然后将生产者中的routingKey改成 routingKey = “good.delete时,消费者1的队列中的routingKey将没办法与交换机中的routingKey相匹配,所以消费者1没法获取消息,结果如下: