在开发的过程中,MQ组件算是在当下的开发模式下必不可少的组件。今天我们就介绍一下MQ领域中的一款优秀的产品-RabbitMQ,其与大名鼎鼎的Spring框架同属VMWare公司。
MQ作用
MQ的好处
- 应用解耦
- 异步提速
- 削峰填谷
MQ的劣势
- 系统可用性降低
- 系统复杂度提高
- 一致性问题
使用场景
- 生产者不需要从消费者处获取反馈
- 容许短暂的不一致性
- 确实需要MQ的好处
RabbitMQ安装
安装rabbitmq
# yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
# yum install epel-release
# yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
# rpm -ivh esl-erlang_18.3-1_centos_7_amd64.rpm
# rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm
# 默认安装位置
/lib/rabbitmq/bin
启动rabbitmq
# systemctl status rabbitmq-server
# systemctl start rabbitmq-server
# 启动管理控制台
# rabbitmq-plugins enable rabbitmq_management
添加管理员
# rabbitmqctl add_user admin 123456
# rabbitmqctl set_user_tags admin administrator
# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# rabbitmqctl list_permissions -p /
# rabbitmqctl list_users
http://192.168.56.81:15672/#/
配置文件
# 默认配置文件
/etc/rabbitmq/rabbitmq.config
# 默认配置文件模板
/usr/share/doc/rabbitmq-server-3.6.13/rabbitmq.config.example
RabbitMQ使用
Hello World工作模式
Producer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*
* 1. queue: 队列名称
* 2. durable: 是否持久化
* 3. exclusive: 是否独占
* 4. autoDelete: 是否自动删除
* 5. arguments: 参数
*/
channel.queueDeclare("hello", true, false, false, null);
/*
* 1. exchange: 交换机名称, 默认交换机是""
* 2. routingKey: 路由名称, 默认路由名称为队列名称
* 3. props: 配置信息
* 4. body: 发送的消息数据
*
*/
String body = "hello rabbitmq";
channel.basicPublish("", "hello", null, body.getBytes());
channel.close();
connection.close();
Consumer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*
* 1. queue: 队列名称
* 2. durable: 是否持久化
* 3. exclusive: 是否独占
* 4. autoDelete: 是否自动删除
* 5. arguments: 参数
*/
channel.queueDeclare("hello", true, false, false, null);
/*
* 1. queue: 队列名称
* 2. autoAck: 是否自动确认
* 3. callback: 回调对象
*/
channel.basicConsume("hello", true, new DefaultConsumer(channel){
@Override
/*
* 1. consumerTag: 标识
* 2. envelope: 获取一些信息, 交换机, 路由
* 3. properties: 配置信息
* 4. body: 数据
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("consumerTag: " + consumerTag);
System.out.println("Exchange: " + envelope.getExchange());
System.out.println("RoutingKey: " + envelope.getRoutingKey());
System.out.println("Properties: " + properties);
System.out.println("body: " + new String(body));
}
});
//不需要关闭资源, 因为需要一直等待消息
Work Queues 工作模式
C1, C2是竞争关系, 只有一个消费者能接收到消息。
Producer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*
* 1. queue: 队列名称
* 2. durable: 是否持久化
* 3. exclusive: 是否独占
* 4. autoDelete: 是否自动删除
* 5. arguments: 参数
*/
channel.queueDeclare("workqueues", true, false, false, null);
/*
* 1. exchange: 交换机名称, 默认交换机是""
* 2. routingKey: 路由名称, 默认路由名称为队列名称
* 3. props: 配置信息
* 4. body: 发送的消息数据
*
*/
for (int i = 0; i < 10; i++) {
String body = "hello rabbitmq" + i;
channel.basicPublish("", "workqueues", null, body.getBytes());
}
channel.close();
connection.close();
Consumer1 & Consumer2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*
* 1. queue: 队列名称
* 2. durable: 是否持久化
* 3. exclusive: 是否独占
* 4. autoDelete: 是否自动删除
* 5. arguments: 参数
*/
channel.queueDeclare("workqueues", true, false, false, null);
/*
* 1. queue: 队列名称
* 2. autoAck: 是否自动确认
* 3. callback: 回调对象
*
*/
channel.basicConsume("workqueues", true, new DefaultConsumer(channel){
@Override
/*
* 1. consumerTag: 标识
* 2. envelope: 获取一些信息, 交换机, 路由
* 3. properties: 配置信息
* 4. body: 数据
*
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
}
});
//不需要关闭资源, 因为需要一直等待消息
}
PubSub工作模式
![image-20221129110636705](/Users/stanley/Library/Application Support/typora-user-images/image-20221129110636705.png)
Producer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1. exchange: 交换机名称
2. type: 交换机类型, DIRECT("direct") 定向, FANOUT("fanout") 广播, TOPIC("topic") 通配符方式, HEADERS("headers") 参数匹配;
3. durable: 是否持久化
4. autoDelete: 是否删除
5. internal: 内部使用
6. arguments: 参数列表
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
/*
queueBind(String queue, String exchange, String routingKey)
1. queue: 队列名称
2. exchange: 交换机名称
3. routingKey: 路由键,绑定规则, 如果交换机类型为fanout则routingKey为""
*/
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");
channel.basicPublish(exchangeName, "", null, "hello rabbit".getBytes());
channel.close();
connection.close();
Consumer1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
/*
* 1. queue: 队列名称
* 2. autoAck: 是否自动确认
* 3. callback: 回调对象
*
*/
channel.basicConsume(queue1Name, true, new DefaultConsumer(channel){
@Override
/*
* 1. consumerTag: 标识
* 2. envelope: 获取一些信息, 交换机, 路由
* 3. properties: 配置信息
* 4. body: 数据
*
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
}
});
//不需要关闭资源, 因为需要一直等待消息
Consumer2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
/*
* 1. queue: 队列名称
* 2. autoAck: 是否自动确认
* 3. callback: 回调对象
*
*/
channel.basicConsume(queue2Name, true, new DefaultConsumer(channel){
@Override
/*
* 1. consumerTag: 标识
* 2. envelope: 获取一些信息, 交换机, 路由
* 3. properties: 配置信息
* 4. body: 数据
*
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
}
});
//不需要关闭资源, 因为需要一直等待消息
Routing工作模式
绑定交换机时需要指定routing key, 消息会转发到符合routing key的queue中。
Producer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1. exchange: 交换机名称
2. type: 交换机类型, DIRECT("direct") 定向, FANOUT("fanout") 广播, TOPIC("topic") 通配符方式, HEADERS("headers") 参数匹配;
3. durable: 是否持久化
4. autoDelete: 是否删除
5. internal: 内部使用
6. arguments: 参数列表
*/
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
/*
queueBind(String queue, String exchange, String routingKey)
1. queue: 队列名称
2. exchange: 交换机名称
3. routingKey: 路由键,绑定规则, 如果交换机类型为fanout则routingKey为""
*/
channel.queueBind(queue1Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");
channel.basicPublish(exchangeName, "info", null, "hello rabbit".getBytes());
channel.close();
connection.close();
Consumer1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
/*
* 1. queue: 队列名称
* 2. autoAck: 是否自动确认
* 3. callback: 回调对象
*
*/
channel.basicConsume(queue1Name, true, new DefaultConsumer(channel){
@Override
/*
* 1. consumerTag: 标识
* 2. envelope: 获取一些信息, 交换机, 路由
* 3. properties: 配置信息
* 4. body: 数据
*
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
}
});
//不需要关闭资源, 因为需要一直等待消
Consumer2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
/*
* 1. queue: 队列名称
* 2. autoAck: 是否自动确认
* 3. callback: 回调对象
*
*/
channel.basicConsume(queue2Name, true, new DefaultConsumer(channel){
@Override
/*
* 1. consumerTag: 标识
* 2. envelope: 获取一些信息, 交换机, 路由
* 3. properties: 配置信息
* 4. body: 数据
*
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
}
});
//不需要关闭资源, 因为需要一直等待消
Topics 工作模式
使用通配符的方式配置路由
Producer
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1. exchange: 交换机名称
2. type: 交换机类型, DIRECT("direct") 定向, FANOUT("fanout") 广播, TOPIC("topic") 通配符方式, HEADERS("headers") 参数匹配;
3. durable: 是否持久化
4. autoDelete: 是否删除
5. internal: 内部使用
6. arguments: 参数列表
*/
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);
/*
queueBind(String queue, String exchange, String routingKey)
1. queue: 队列名称
2. exchange: 交换机名称
3. routingKey: 路由键,绑定规则, 如果交换机类型为fanout则routingKey为""
*/
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue1Name, exchangeName, "order.*");
channel.queueBind(queue2Name, exchangeName, "*.*");
channel.basicPublish(exchangeName, "order.info", null, "hello rabbit".getBytes());
channel.basicPublish(exchangeName, "order1.info", null, "hello rabbit".getBytes());
channel.close();
connection.close()
Consumer1
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
/*
* 1. queue: 队列名称
* 2. autoAck: 是否自动确认
* 3. callback: 回调对象
*
*/
channel.basicConsume(queue1Name, true, new DefaultConsumer(channel){
@Override
/*
* 1. consumerTag: 标识
* 2. envelope: 获取一些信息, 交换机, 路由
* 3. properties: 配置信息
* 4. body: 数据
*
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
}
});
//不需要关闭资源, 因为需要一直等待消息
Consumer2
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
/*
* 1. queue: 队列名称
* 2. autoAck: 是否自动确认
* 3. callback: 回调对象
*
*/
channel.basicConsume(queue2Name, true, new DefaultConsumer(channel){
@Override
/*
* 1. consumerTag: 标识
* 2. envelope: 获取一些信息, 交换机, 路由
* 3. properties: 配置信息
* 4. body: 数据
*
*/
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body: " + new String(body));
}
});
//不需要关闭资源, 因为需要一直等待消