使用介绍 -- Java

名词说明

  • Virtual Host 包含若干个 Exchange 和 Queue,表示一个节点
  • Exchange 接受客户端发送的消息,并根据 Binding 将消息路由给服务器中的队列,经常使用的 Exchange 包括 direct, fanout, topic 类型
  • Binding 连接 Exchange 和 Queue,包含路由规则
  • Queue 消息队列,存储还未被消费的消息
  • Message Header+Body
  • Channel 通道,执行 AMQP 的命令, 一个连接可创建多个通道以节省资源
    • RoutingKey 路由键,一个 queue 绑定一个路由键,每个消息发送时也会指定一个路由键,exchange 只会将消息路由到 RoutingKey 匹配的 queue 中

使用说明

入门示例

首先根据管理页面提示创建一个消息队列实例。 创建成功后,点击进入实例详情页,然后点击页面上的账号与密码,获取对应的 RabbitMQ 用户名密码。

下面以 RabbitMQ 官方提供的 java client sdk 为例:

生产者

创建连接:

//创建客户端配置
ConnectionFactory factory = new ConnectionFactory();
//配置为 Rabbitmq 地址
factory.setHost("IP");     
factory.setPort(5672);
//填写RabbitMQ账号, 在[账户与密码]中可查看。
factory.setUsername(username);     
//填写RabbitMQ密码, 在[账户与密码]中可查看。
factory.setPassword(password);
//创建一个连接
Connection connection = factory.newConnection();
//打开一个通道
Channel channel = connection.createChannel();

创建Queue,并绑定交换机:

//创建一个名为 queue1 的队列
channel.queueDeclare("queue1", false, false, false, null);

第二个参数 durable false 设置队列为非持久化,第三个参数 exclusive false 设置队列为非独占,第四个参数 autoDelete 设置为 false 表示队列上无连接时自动删除。 队列默认会绑定到 RabbitMQ 的一个 direct 类型且名为""的 exchange。direct 交换器的特点是只会将收到的消息路由到 RoutingKey 匹配的队列中去,如果无法匹配,默认情况下发送到该 exchange 的消息消息将被丢弃,如果有多个 Queue 上均有匹配的 RoutingKey,则 RabbitMQ 会自动选择一个 queue 发送消息。 或者,我们也可以自己声明一个交换机并将队列绑定上去:

//创建交换机
channel.exchangeDeclare("exchangeName", "direct", false, false, null);

第三个和第四个参数设置 exchange 为非持久化,以及非自动删除:

//绑定队列到刚刚创建的exchange
channel.queueBind("queue1, "exchangeName", "");

发布消息:

 channel.basicPublish("exchangeName", "key", null, "this is a message".getBytes("UTF-8"));

第一个参数表示 exchange 名称,第二个参数为消息的 RoutingKey,第三个参数为消息选项(例如持久化选项等),第四个参数表示消息内容。

消费者

创建连接:

//创建客户端配置
ConnectionFactory factory = new ConnectionFactory();
//配置为Rabbitmq地址
factory.setHost("IP");     
factory.setPort(5672);
//填写RabbitMQ账号, 在[账户与密码]中可查看。
factory.setUsername(username);     
//填写RabbitMQ密码, 在[账户与密码]中可查看。
factory.setPassword(password);
//创建一个连接
Connection connection = factory.newConnection();
//打开一个通道
Channel channel = connection.createChannel();

消费消息:

Consumer consumer = new DefaultConsumer(channel) {
    //有消息到来时,回调该方法
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
      throws IOException {
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
    }
};
//启动消费者,参数2表示使用autoAck自动确认机制
channel.basicConsume("queue1", true, consumer);

这里直接使用了 DefaultConsumer 类实现并覆盖了 handleDelivery 方法,当有消息到来的时候,client 会回调 handleDelivery 方法,basicConsume 表示启动消费者,并等待消息到来。

通常情况下,消费端也会调用 exchangeDeclare 方法和 queueDeclare 方法去声明交换器和队列,以防止生产者还未启动而导致消费者获取不到 exchange 和 queue 的相关信息。

保证消息可靠性

Attention

RabbitMQ 提供了三种机制来保证消息的可靠性,他们分别为生产者 confirm 机制、 消费者 ack 机制、persist 持久化机制,注意开启这 3 个选项会对性能有较大的影响,所以应当根据实际情况选择。下面介绍如何使用这 3 种机制。

生产者 confirm 机制

一旦开启了生产者 confirm 机制,rabbitmq 会在处理完消息后,在当前 channel 上通过发送 basic.ack 的方式对其进行 confirm 。

开启confirm:

//开启confirm:发送select帧
channel.confirmSelect();

确认消息被发布:

其中收到消息的 delivery-tag 域的值标识了被 confirm 消息的序列号。RabbitMQ 也可以通过设置 basic.ack 中的 multiple 域来表明到指定序列号为止的所有消息都已被正确的处理了。 调用 channel.waitForConfirms()方法等待前面发送的消息被确认,返回 true 表示前面的消息都被正确的发布了。这里最好采用批量确认(发送一批消息后调用一次该方法)以获得较好的性能。

消费者 ACK 机制

入门示例中打开了 autoAck 自动确认机制,表示一旦消费者接收到了该消息,不管消费者是否处理完,立刻向 RabbitMQ 返回一个确认消息。有时候我们还需要保证消费者已经处理完了消息才能给 RabbitMQ 发送这条确认消息,通过以下方法设置:

//第二个参数autoAck参数为false,表示启动消费者时,关闭自动确认机制
channel.basicConsume("queue1", false, consumer);

我们可以通过调用 channel.basicAck 方法来确认一个消息已被正常的处理,或者调用我们可以通过调用 channel.basicNack 方法拒绝一个或一批消息。

persist 持久化机制

前面的例子 exchange, queue, message都是非持久化的,如果消息未被确认,一旦重启 RabbitMQ,所有的信息都将丢失。可以通过以下字段进行持久化设置:

设置queue为持久化类型:

// 设置第二个参数durable为true指定了queue为持久化类型,重启时该queue信息不会丢失
channel.queueDeclare("queue1", true, false, false, null); 

设置exchange为持久化类型:

// 设置第三个参数durable为true指定了exchange为持久化类型,重启时该exchange信息不会丢失
channel.exchangeDeclare("exchangeName", "direct", true, false, null);

设置message为持久化类型:

// 第三个参数指定了消息属性为持久化的,发送的消息将被rabbitmq落盘
channel.basicPublish("exchangeName", "key", MessageProperties.PERSISTENT_TEXT_PLAIN, "this is a message".getBytes("UTF-8"));

消费端负载均衡

当一条消息到达了 Queue 中,如果该队列上同时有多个 Consumer,默认情况下 RabbitMQ 采用 Round-robin dispatching 循环分发的方法轮流给每个 Consumer 发送消息。当某个 Consumer 任务比较繁忙,就会导致有的 Consumer 几乎无事可做,有的 Consumer 却毫无休息的机会。此时可以打开消费者负载均衡选项,用来控制每个 Consumer 队列的最大待处理消息个数:

// 表示当前consumer同时最多处理prefetchCount个消息
channel.basicQos(prefetchCount);

除了前文提到的 direct 类型的 exchange,RabbitMQ 提供了另外 2 种类型的 exchange:fanout exchange 和 topic exchange:

  • fanout exchange 会将消息发送到所有绑定到该 exchange 的 queue 中而不关注 RoutingKey。
  • topic exchange 更为灵活,并且使用更为广泛,它会根据 RoutingKey 进行模式匹配,只要 RoutingKey 匹配就会发送到对应的 queue 中。

RoutingKey正则匹配:

每一条消息 RoutingKey 会包含若干个标识符(以点号分隔),上图中的 * 表示可以匹配一个标识符,# 表示可以匹配 0 个或多个标识符。 上图中 Q1 与 .orange. 绑定,Q2 与 ..rabbit 和 lazy.# 绑定。假设一条消息的 routingKey 为 lazy.orange.pig,那么 Q1 和 Q2 都将与这条消息匹配。通过对 RoutingKey 和 BindingKey 的精心设计,可以满足点对点(私信)、组播、广播等业务场景的通信需求。

事务消息

事务消息对性能的影响较大,不推荐使用。简单介绍一下相关的接口:

在当前channel上打开TX事务模式:

channel.txSelect();

在当前channel上提交一个TX事务:

 channel.txCommit();

在当前channel上回滚一个TX事务:

channel.txRollback();

进阶使用

TTL, 延迟队列

RabbitMQ 允许对 queue 和 message 设置 TTL 值(过期时间)

Per-Queue Message TTL: 通过在声明 queue 的时候设置 x-message-ttl,可以控制被 publish 到 queue 中的 message 被丢弃前能够存活的时间:

// 声明一个消息过期时间为60s的队列
Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-message-ttl", 60000);  
channel.queueDeclare("queue1", false, false, false, args);

Per-Message TTL: 发送消息时设置 expiration 字段可以指定消息的过期时间:

// 声明一条过期时间为60s的消息
AMQP.BasicProperties properties = new AMQP.BasicProperties();  
properties.setExpiration("60000");  
channel.basicPublish("myexchange", "routingkey", properties, "This is a message".getBytes());

利用TTL和DLX实现延迟队列:

dead-letter-exchange(DLX)是一种特殊的 exchange,当消息在一个队列中变成死信后(消息被拒绝且 requeue 为 false,或 TTL 过期,或队列达到最大长度),它能被重新 publish 到 DLX。

通过 TTL 和 DLX 的配合使用,可以实现高可靠的延迟队列,方法如下:

HashMap<String, Object> arguments = new HashMap<String, Object>();  
// 设置DLX为rabbitmq自带的amq.direct exchange
arguments.put("x-dead-letter-exchange", "amq.direct");  
// 设置死信队列中的消息RoutingKey为dlxKey
arguments.put("x-dead-letter-routing-key", "dlxKey");  
// 使用上面设置的参数,声明一个名为delayQueue的队列,默认绑定到rabbitmq自带的名为""的exchange
channel.queueDeclare("delayQueue", false, false, false, args);

再声明一个接收延迟消息的队列:

// 声明一个名为RECV_QUEUE的queue
channel.queueDeclare("RECV_QUEUE", true, false, false, null);  
// 绑定到amq.direct exchange,且绑定的RoutingKey为dlxKey
channel.queueBind("RECV_QUEUE", "amq.direct", "dlxKey");

投递消息:

// 声明一条过期时间为60s的消息
AMQP.BasicProperties properties = new AMQP.BasicProperties();  
properties.setExpiration("60000");  
//投递到默认的""exchange,消息过期后会被重新投递到amq.direct,然后路由到RECV_QUEUE中
channel.basicPublish("", "routingkey", properties, "This is a message".getBytes());

接收消息:

//消费者在RECV_QUEUE队列中等待消息到来
channel.basicConsume("RECV_QUEUE", true, consumer);

队列自动删除

queue.declare 命令中的 x-expires 参数控制 queue 被自动删除前可以处于未使用状态的时间,即当前 queue 上没有任何 Consumer 的队列被自动删除的时间:

// queue未被使用30分钟后自动删除
Map<String, Object> args = new HashMap<String, Object>();  
args.put("x-expires", 1800000);  
channel.queueDeclare("myqueue", false, false, false, args);  

队列最大长度

通过 x-max-length 参数可以设置单个队列的最大长度:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10);
channel.queueDeclare("myqueue", false, false, false, args);

Attention

当队列达到最大长度限制,此时新消息到达时,队列头部的消息将被丢弃。

优先级队列

RabbitMQ 提供了 x-max-priority 参数以支持优先级队列:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("my-priority-queue", true, false, false, args);

x-max-priority 参数的值表示该队列支持的最大优先级。消息优先级可以通过下列方法设置:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
channel.basicPublish("exchange_priority", "rk_priority", builder.build(), ("messages-" + i).getBytes());

消息优先级默认为 0,其范围在 0~x-max-priority 之间,对于大于 x-max-priority 的值将作为 x-max-priority 处理。

使用消息优先级特性将带来一些性能损耗,并且如果一个空闲的消费者连接到空队列,此时新消息进入的话,消息不会花费任何时间在队列中等待,消息将直接被消费,此时的消息优先级将没有意义。

policy 策略

如果我们使用客户端声明了一个 queue 并设置 TTL 之后想去更新这个 TTL,除了重新声明队列,并重新设置 TTL 的办法之外,RabbitMQ 提供了一种称为 policy 的更为灵活的办法去更新相关的参数:

policy 通过正则表达式匹配指定的对象(queue 或者 exchange),然后可以在这些 queue 或者 exchange 上应用我们的策略设置。管理平台相关 policy 的设置页面如下:

镜像队列

在集群环境中,队列只有元数据会在集群的所有节点同步,但队列中的数据只会存在于一个节点,数据没有冗余且容易丢,甚至在 durable 的情况下,如果所在的服务器节点宕机,就要等待节点恢复才能继续提供消息服务。

NQS 利用 RabbiMQ 提供的镜像队列,高可用模式下队列数据将在所有节点同步并落盘,因此任一一个节点宕机将不影响服务的可用性。镜像队列也有主从之分,正常情况下只有 master 才对外服务,slave 节点只提供备份服务。同样的,我们也可以通过 policy 来进行镜像队列的设置(上图的HA选项)。

Alternate Exchange(AE)

当一条消息无法被一个 exchange 路由到一个 queue 时,默认情况下消息将被丢弃。某些情况,需要知道被 rabbitmq 丢弃的消息有哪些,如果定义了 AE, 消息将被路由到 AE 绑定的 queue 当中。

注册流控事件

当消费者的消费速度小于生产者的生成速度时,会出现消息堆积的情况,此时的性能将会出现大幅度下降,甚至还可能会出现流控,发生流控时的 RabbitMQ 将会停止生产。很多时候我们需要知道何时发生了流控事件,除了 NQS 提供的事件报警机制外,利用客户端本身提供的功能也能获得该通知:

ConnectionFactory factory = new ConnectionFactory();
        Connection connection = factory.newConnection();
        connection.addBlockedListener(new BlockedListener() {
            public void handleBlocked(String reason) throws IOException {
                // Connection is now blocked
            }

            public void handleUnblocked() throws IOException {
                // Connection is now unblocked
            }
        });