在开发的过程中,MQ组件算是在当下的开发模式下必不可少的组件。今天我们就介绍一下MQ领域中的一款优秀的产品-RabbitMQ,其与大名鼎鼎的Spring框架同属VMWare公司。

MQ作用

MQ的好处

  • 应用解耦
  • 异步提速
  • 削峰填谷

MQ的劣势

  • 系统可用性降低
  • 系统复杂度提高
  • 一致性问题

使用场景

  • 生产者不需要从消费者处获取反馈
  • 容许短暂的不一致性
  • 确实需要MQ的好处

RabbitMQ安装

安装rabbitmq

# yum install build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz
# yum install epel-release
# yum install unixODBC unixODBC-devel wxBase wxGTK SDL wxGTK-gl
# rpm -ivh esl-erlang_18.3-1_centos_7_amd64.rpm
# rpm -ivh socat-1.7.3.2-2.el7.x86_64.rpm

# 默认安装位置
/lib/rabbitmq/bin

启动rabbitmq

# systemctl status  rabbitmq-server
# systemctl start rabbitmq-server
# 启动管理控制台
# rabbitmq-plugins enable rabbitmq_management

添加管理员

# rabbitmqctl add_user admin 123456
# rabbitmqctl set_user_tags admin administrator
# rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
# rabbitmqctl list_permissions -p /
# rabbitmqctl list_users

配置文件

# 默认配置文件
/etc/rabbitmq/rabbitmq.config

# 默认配置文件模板
/usr/share/doc/rabbitmq-server-3.6.13/rabbitmq.config.example

RabbitMQ使用

Hello World工作模式

Producer -> Queue -> Consuming: send and receive messages from a named queue.

Producer

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

/*
*  1. queue: 队列名称
*  2. durable: 是否持久化
*  3. exclusive: 是否独占
*  4. autoDelete: 是否自动删除
*  5. arguments: 参数
*/
channel.queueDeclare("hello", true, false, false, null);


/*
*  1. exchange: 交换机名称, 默认交换机是""
*  2. routingKey: 路由名称, 默认路由名称为队列名称
*  3. props: 配置信息
*  4. body: 发送的消息数据
*
*/
String body = "hello rabbitmq";
channel.basicPublish("", "hello", null, body.getBytes());

channel.close();
connection.close();

Consumer

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

/*
*  1. queue: 队列名称
*  2. durable: 是否持久化
*  3. exclusive: 是否独占
*  4. autoDelete: 是否自动删除
*  5. arguments: 参数
*/
channel.queueDeclare("hello", true, false, false, null);


/*
*  1. queue: 队列名称
*  2. autoAck: 是否自动确认
*  3. callback: 回调对象
*/
channel.basicConsume("hello", true, new DefaultConsumer(channel){
  @Override
  /*
  *  1. consumerTag: 标识
  *  2. envelope: 获取一些信息, 交换机, 路由
  *  3. properties: 配置信息
  *  4. body: 数据
  */
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("consumerTag: " + consumerTag);
    System.out.println("Exchange: " + envelope.getExchange());
    System.out.println("RoutingKey: " + envelope.getRoutingKey());
    System.out.println("Properties: " + properties);
    System.out.println("body: " + new String(body));
  }
});

//不需要关闭资源, 因为需要一直等待消息

Work Queues 工作模式

Producer -> Queue -> Consuming: Work Queue used to distribute time-consuming tasks among multiple workers.

C1, C2是竞争关系, 只有一个消费者能接收到消息。

Producer

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

/*
*  1. queue: 队列名称
*  2. durable: 是否持久化
*  3. exclusive: 是否独占
*  4. autoDelete: 是否自动删除
*  5. arguments: 参数
*/
channel.queueDeclare("workqueues", true, false, false, null);


/*
*  1. exchange: 交换机名称, 默认交换机是""
*  2. routingKey: 路由名称, 默认路由名称为队列名称
*  3. props: 配置信息
*  4. body: 发送的消息数据
*
*/
for (int i = 0; i < 10; i++) {
  String body = "hello rabbitmq" + i;
  channel.basicPublish("", "workqueues", null, body.getBytes());
}

channel.close();
connection.close();

Consumer1 & Consumer2

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

/*
*  1. queue: 队列名称
*  2. durable: 是否持久化
*  3. exclusive: 是否独占
*  4. autoDelete: 是否自动删除
*  5. arguments: 参数
*/
channel.queueDeclare("workqueues", true, false, false, null);


/*
*  1. queue: 队列名称
*  2. autoAck: 是否自动确认
*  3. callback: 回调对象
*
*/
channel.basicConsume("workqueues", true, new DefaultConsumer(channel){
  @Override
  /*
  *  1. consumerTag: 标识
  *  2. envelope: 获取一些信息, 交换机, 路由
  *  3. properties: 配置信息
  *  4. body: 数据
  *
  */
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body: " + new String(body));
  }
});

//不需要关闭资源, 因为需要一直等待消息
}

PubSub工作模式

Producer -> Queue -> Consuming: deliver a message to multiple consumers. This pattern is known as publish/subscribe

![image-20221129110636705](/Users/stanley/Library/Application Support/typora-user-images/image-20221129110636705.png)

Producer

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1. exchange: 交换机名称
2. type: 交换机类型,  DIRECT("direct") 定向, FANOUT("fanout") 广播, TOPIC("topic") 通配符方式, HEADERS("headers") 参数匹配;
3. durable: 是否持久化
4. autoDelete: 是否删除
5. internal: 内部使用
6. arguments: 参数列表
*/
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, true, false, false, null);

String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);

/*
queueBind(String queue, String exchange, String routingKey)
1. queue: 队列名称
2. exchange: 交换机名称
3. routingKey: 路由键,绑定规则, 如果交换机类型为fanout则routingKey为""
*/
channel.queueBind(queue1Name, exchangeName, "");
channel.queueBind(queue2Name, exchangeName, "");


channel.basicPublish(exchangeName, "", null, "hello rabbit".getBytes());


channel.close();
connection.close();

Consumer1

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";

/*
*  1. queue: 队列名称
*  2. autoAck: 是否自动确认
*  3. callback: 回调对象
*
*/
channel.basicConsume(queue1Name, true, new DefaultConsumer(channel){
  @Override
  /*
  *  1. consumerTag: 标识
  *  2. envelope: 获取一些信息, 交换机, 路由
  *  3. properties: 配置信息
  *  4. body: 数据
  *
  */
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body: " + new String(body));
  }
});

//不需要关闭资源, 因为需要一直等待消息

Consumer2

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";

/*
*  1. queue: 队列名称
*  2. autoAck: 是否自动确认
*  3. callback: 回调对象
*
*/
channel.basicConsume(queue2Name, true, new DefaultConsumer(channel){
  @Override
  /*
  *  1. consumerTag: 标识
  *  2. envelope: 获取一些信息, 交换机, 路由
  *  3. properties: 配置信息
  *  4. body: 数据
  *
  */
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body: " + new String(body));
  }
});

//不需要关闭资源, 因为需要一直等待消息

Routing工作模式

Producer -> Queue -> Consuming: subscribe to a subset of the messages only.

绑定交换机时需要指定routing key, 消息会转发到符合routing key的queue中。

Producer

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1. exchange: 交换机名称
2. type: 交换机类型,  DIRECT("direct") 定向, FANOUT("fanout") 广播, TOPIC("topic") 通配符方式, HEADERS("headers") 参数匹配;
3. durable: 是否持久化
4. autoDelete: 是否删除
5. internal: 内部使用
6. arguments: 参数列表

*/
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, true, false, false, null);

String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);

/*
queueBind(String queue, String exchange, String routingKey)
1. queue: 队列名称
2. exchange: 交换机名称
3. routingKey: 路由键,绑定规则, 如果交换机类型为fanout则routingKey为""
*/
channel.queueBind(queue1Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "info");
channel.queueBind(queue2Name, exchangeName, "error");
channel.queueBind(queue2Name, exchangeName, "warning");


channel.basicPublish(exchangeName, "info", null, "hello rabbit".getBytes());


channel.close();
connection.close();

Consumer1

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";

/*
*  1. queue: 队列名称
*  2. autoAck: 是否自动确认
*  3. callback: 回调对象
*
*/
channel.basicConsume(queue1Name, true, new DefaultConsumer(channel){
  @Override
  /*
  *  1. consumerTag: 标识
  *  2. envelope: 获取一些信息, 交换机, 路由
  *  3. properties: 配置信息
  *  4. body: 数据
  *
  */
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body: " + new String(body));
  }
});

//不需要关闭资源, 因为需要一直等待消

Consumer2

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";

/*
*  1. queue: 队列名称
*  2. autoAck: 是否自动确认
*  3. callback: 回调对象
*
*/
channel.basicConsume(queue2Name, true, new DefaultConsumer(channel){
  @Override
  /*
  *  1. consumerTag: 标识
  *  2. envelope: 获取一些信息, 交换机, 路由
  *  3. properties: 配置信息
  *  4. body: 数据
  *
  */
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body: " + new String(body));
  }
});

//不需要关闭资源, 因为需要一直等待消

Topics 工作模式

Producer -> Queue -> Consuming: receiving messages based on a pattern (topics).

使用通配符的方式配置路由

Producer

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

/*
exchangeDeclare(String exchange, BuiltinExchangeType type, boolean durable, boolean autoDelete, boolean internal, Map<String, Object> arguments)
1. exchange: 交换机名称
2. type: 交换机类型,  DIRECT("direct") 定向, FANOUT("fanout") 广播, TOPIC("topic") 通配符方式, HEADERS("headers") 参数匹配;
3. durable: 是否持久化
4. autoDelete: 是否删除
5. internal: 内部使用
6. arguments: 参数列表

*/
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC, true, false, false, null);

String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name, true, false, false, null);
channel.queueDeclare(queue2Name, true, false, false, null);

/*
queueBind(String queue, String exchange, String routingKey)
1. queue: 队列名称
2. exchange: 交换机名称
3. routingKey: 路由键,绑定规则, 如果交换机类型为fanout则routingKey为""
*/
channel.queueBind(queue1Name, exchangeName, "#.error");
channel.queueBind(queue1Name, exchangeName, "order.*");
channel.queueBind(queue2Name, exchangeName, "*.*");


channel.basicPublish(exchangeName, "order.info", null, "hello rabbit".getBytes());
channel.basicPublish(exchangeName, "order1.info", null, "hello rabbit".getBytes());


channel.close();
connection.close()

Consumer1

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";

/*
*  1. queue: 队列名称
*  2. autoAck: 是否自动确认
*  3. callback: 回调对象
*
*/
channel.basicConsume(queue1Name, true, new DefaultConsumer(channel){
  @Override
  /*
  *  1. consumerTag: 标识
  *  2. envelope: 获取一些信息, 交换机, 路由
  *  3. properties: 配置信息
  *  4. body: 数据
  *
  */
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body: " + new String(body));
  }
});

//不需要关闭资源, 因为需要一直等待消息

Consumer2

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.56.199");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123456");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";

/*
*  1. queue: 队列名称
*  2. autoAck: 是否自动确认
*  3. callback: 回调对象
*
*/
channel.basicConsume(queue2Name, true, new DefaultConsumer(channel){
  @Override
  /*
  *  1. consumerTag: 标识
  *  2. envelope: 获取一些信息, 交换机, 路由
  *  3. properties: 配置信息
  *  4. body: 数据
  *
  */
  public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
    System.out.println("body: " + new String(body));
  }
});

//不需要关闭资源, 因为需要一直等待消