使用介绍
Attention
出于安全考虑,我们不提供公网 IP 来连接 RabbitMQ,因此如果要在本地连接调试需要先参考文档如何使用网易云 OpenVPN ,根据自己的实例所在可用区使用或自行搭建 VPN 连接或直接通过与实例同可用区或同 VPC 的云服务器使用 RabbitMQ
名词说明
- Virtual Host 包含若干个Exchange和Queue,表示一个节点
- Exchange 接受客户端发送的消息,并根据Binding将消息路由给服务器中的队列,经常使用的Exchange包括direct, fanout, topic类型
- Binding 连接Exchange和Queue,包含路由规则
- Queue 消息队列,存储还未被消费的消息
- Message Header+Body
- Channel 通道,执行AMQP的命令, 一个连接可创建多个通道以节省资源
- RoutingKey 路由键,一个queue绑定一个路由键,每个消息发送时也会指定一个路由键,exchange只会将消息路由到RoutingKey匹配的queue中
使用说明
入门示例
首先根据管理页面提示创建一个消息队列实例。 创建成功后,点击进入实例详情页,然后点击页面上的账号与密码,获取对应的RabbitMQ用户名密码。 下面以RabbitMQ官方提供的java client sdk为例:
生产者
创建连接
//创建客户端配置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //配置为Rabbitmq地址
factory.setPort(5672);
factory.setUsername(username); //配置为上面获取的用户名密码
factory.setPassword(password);
//创建一个连接
Connection connection = factory.newConnection();
//打开一个通道
Channel channel = connection.createChannel();
创建Queue,并绑定交换机
//创建一个名为queue1的队列
channel.queueDeclare("queue1", false, false, false, null);
第二个参数durable false设置队列为非持久化,第三个参数exclusive false设置队列为非独占,第四个参数autoDelete设置为false表示队列上无连接时自动删除。 队列默认会绑定到RabbitMQ的一个direct类型且名为""的exchange。direct交换器的特点是只会将收到的消息路由到RoutingKey匹配的队列中去,如果无法匹配,默认情况下发送到该exchange的消息消息将被丢弃,如果有多个Queue上均有匹配的RoutingKey,则RabbitMQ会自动选择一个queue发送消息。 或者,我们也可以自己声明一个交换机并将队列绑定上去:
//创建交换机
channel.exchangeDeclare("exchangeName", "direct", false, false, null);
第三个和第四个参数设置exchange为非持久化,以及非自动删除。
//绑定队列到刚刚创建的exchange
channel.queueBind("queue1, "exchangeName", "");
发布消息
channel.basicPublish("exchangeName", "key", null, "this is a message".getBytes("UTF-8"));
第一个参数表示exchange名称,第二个参数为消息的RoutingKey,第三个参数为消息选项(例如持久化选项等),第四个参数表示消息内容。
消费者
创建连接
//创建客户端配置
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("127.0.0.1"); //配置为Rabbitmq地址
factory.setPort(5672);
factory.setUsername(username); //配置为上面获取的用户名密码
factory.setPassword(password);
//创建一个连接
Connection connection = factory.newConnection();
//打开一个通道
Channel channel = connection.createChannel();
消费消息
Consumer consumer = new DefaultConsumer(channel) {
//有消息到来时,回调该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println(" [x] Received '" + message + "'");
}
};
//启动消费者,参数2表示使用autoAck自动确认机制
channel.basicConsume("queue1", true, consumer);
这里直接使用了DefaultConsumer类实现并覆盖了handleDelivery方法,当有消息到来的时候,client会回调handleDelivery方法,basicConsume表示启动消费者,并等待消息到来。
通常情况下,消费端也会调用exchangeDeclare方法和queueDeclare方法去声明交换器和队列,以防止生产者还未启动而导致消费者获取不到exchange和queue的相关信息。
保证消息可靠性
Attention
RabbitMQ提供了三种机制来保证消息的可靠性,他们分别为生产者confirm机制、 消费者ack机制、persist持久化机制,注意开启这3个选项会对性能有较大的影响,所以应当根据实际情况选择。下面介绍如何使用这3种机制。
生产者 confirm 机制
一旦开启了生产者confirm机制,rabbitmq会在处理完消息后,在当前 channel 上通过发送 basic.ack 的方式对其进行 confirm 。
开启confirm
//开启confirm:发送select帧
channel.confirmSelect();
确认消息被发布
其中收到消息的delivery-tag 域的值标识了被 confirm 消息的序列号。RabbitMQ也可以通过设置 basic.ack 中的 multiple 域来表明到指定序列号为止的所有消息都已被正确的处理了。 调用 channel.waitForConfirms()方法等待前面发送的消息被确认,返回true表示前面的消息都被正确的发布了。这里最好采用批量确认(发送一批消息后调用一次该方法)以获得较好的性能。
消费者ACK机制
入门示例中打开了autoAck自动确认机制,表示一旦消费者接收到了该消息,不管消费者是否处理完,立刻向RabbitMQ返回一个确认消息。有时候我们还需要保证消费者已经处理完了消息才能给RabbitMQ发送这条确认消息,通过以下方法设置:
//第二个参数autoAck参数为false,表示启动消费者时,关闭自动确认机制
channel.basicConsume("queue1", false, consumer);
我们可以通过调用channel.basicAck方法来确认一个消息已被正常的处理,或者调用我们可以通过调用channel.basicNack方法拒绝一个或一批消息。
persist持久化机制
前面的例子exchange, queue, message都是非持久化的,如果消息未被确认,一旦重启RabbitMQ,所有的信息都将丢失。可以通过以下字段进行持久化设置:
设置queue为持久化类型
// 设置第二个参数durable为true指定了queue为持久化类型,重启时该queue信息不会丢失
channel.queueDeclare("queue1", true, false, false, null);
设置exchange为持久化类型
// 设置第三个参数durable为true指定了exchange为持久化类型,重启时该exchange信息不会丢失
channel.exchangeDeclare("exchangeName", "direct", true, false, null);
设置message为持久化类型
// 第三个参数指定了消息属性为持久化的,发送的消息将被rabbitmq落盘
channel.basicPublish("exchangeName", "key", MessageProperties.PERSISTENT_TEXT_PLAIN, "this is a message".getBytes("UTF-8"));
消费端负载均衡
当一条消息到达了Queue中,如果该队列上同时有多个Consumer,默认情况下RabbitMQ采用 Round-robin dispatching循环分发的方法轮流给每个Consumer发送消息。当某个Consumer任务比较繁忙,就会导致有的Consumer几乎无事可做,有的Consumer却毫无休息的机会。此时可以打开消费者负载均衡选项,用来控制每个Consumer队列的最大待处理消息个数:
// 表示当前consumer同时最多处理prefetchCount个消息
channel.basicQos(prefetchCount);
除了前文提到的direct类型的exchange,RabbitMQ提供了另外2种类型的exchange:fanout exchange和topic exchange:
- fanout exchange会将消息发送到所有绑定到该exchange的queue中而不关注RoutingKey。
- topic exchange更为灵活,并且使用更为广泛,它会根据RoutingKey进行模式匹配,只要RoutingKey匹配就会发送到对应的queue中。
RoutingKey正则匹配:
每一条消息RoutingKey会包含若干个标识符(以点号分隔),上图中的表示可以匹配一个标识符,#表示可以匹配0个或多个标识符。 上图中Q1与.orange.绑定,Q2与.*.rabbit和lazy.#绑定。假设一条消息的routingKey为lazy.orange.pig,那么Q1和Q2都将与这条消息匹配。通过对RoutingKey和BindingKey的精心设计,可以满足点对点(私信)、组播、广播等业务场景的通信需求。
事务消息
事务消息对性能的影响较大,不推荐使用。简单介绍一下相关的接口:
在当前channel上打开TX事务模式
channel.txSelect();
在当前channel上提交一个TX事务
channel.txCommit();
在当前channel上回滚一个TX事务
channel.txRollback();
进阶使用
TTL, 延迟队列
RabbitMQ允许对queue和message设置TTL值(过期时间)
Per-Queue Message TTL: 通过在声明queue的时候设置x-message-ttl,可以控制被 publish 到 queue 中的 message 被丢弃前能够存活的时间。
// 声明一个消息过期时间为60s的队列
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 60000);
channel.queueDeclare("queue1", false, false, false, args);
Per-Message TTL: 发送消息时设置expiration字段可以指定消息的过期时间
// 声明一条过期时间为60s的消息
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
channel.basicPublish("myexchange", "routingkey", properties, "This is a message".getBytes());
利用TTL和DLX实现延迟队列:
dead-letter-exchange(DLX)是一种特殊的exchange,当消息在一个队列中变成死信后(消息被拒绝且requeue为false,或TTL过期,或队列达到最大长度),它能被重新publish到DLX。
通过TTL和DLX的配合使用,可以实现高可靠的延迟队列,方法如下:
HashMap<String, Object> arguments = new HashMap<String, Object>();
// 设置DLX为rabbitmq自带的amq.direct exchange
arguments.put("x-dead-letter-exchange", "amq.direct");
// 设置死信队列中的消息RoutingKey为dlxKey
arguments.put("x-dead-letter-routing-key", "dlxKey");
// 使用上面设置的参数,声明一个名为delayQueue的队列,默认绑定到rabbitmq自带的名为""的exchange
channel.queueDeclare("delayQueue", false, false, false, args);
再声明一个接收延迟消息的队列:
// 声明一个名为RECV_QUEUE的queue
channel.queueDeclare("RECV_QUEUE", true, false, false, null);
// 绑定到amq.direct exchange,且绑定的RoutingKey为dlxKey
channel.queueBind("RECV_QUEUE", "amq.direct", "dlxKey");
投递消息:
// 声明一条过期时间为60s的消息
AMQP.BasicProperties properties = new AMQP.BasicProperties();
properties.setExpiration("60000");
//投递到默认的""exchange,消息过期后会被重新投递到amq.direct,然后路由到RECV_QUEUE中
channel.basicPublish("", "routingkey", properties, "This is a message".getBytes());
接收消息:
//消费者在RECV_QUEUE队列中等待消息到来
channel.basicConsume("RECV_QUEUE", true, consumer);
队列自动删除
queue.declare 命令中的 x-expires 参数控制 queue 被自动删除前可以处于未使用状态的时间,即当前queue上没有任何Consumer的队列被自动删除的时间。
// queue未被使用30分钟后自动删除
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-expires", 1800000);
channel.queueDeclare("myqueue", false, false, false, args);
队列最大长度
通过x-max-length参数可以设置单个队列的最大长度:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-length", 10);
channel.queueDeclare("myqueue", false, false, false, args);
Attention
当队列达到最大长度限制,此时新消息到达时,队列头部的消息将被丢弃。
优先级队列
RabbitMQ提供了x-max-priority参数以支持优先级队列。
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-max-priority", 10);
channel.queueDeclare("my-priority-queue", true, false, false, args);
x-max-priority参数的值表示该队列支持的最大优先级。消息优先级可以通过下列方法设置:
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.priority(5);
channel.basicPublish("exchange_priority", "rk_priority", builder.build(), ("messages-" + i).getBytes());
消息优先级默认为0,其范围在0~x-max-priority之间,对于大于x-max-priority的值将作为x-max-priority处理。
使用消息优先级特性将带来一些性能损耗,并且如果一个空闲的消费者连接到空队列,此时新消息进入的话,消息不会花费任何时间在队列中等待,消息将直接被消费,此时的消息优先级将没有意义。
policy策略
如果我们使用客户端声明了一个queue并设置TTL之后想去更新这个TTL,除了重新声明队列,并重新设置TTL的办法之外,RabbitMQ提供了一种称为policy的更为灵活的办法去更新相关的参数:
policy通过正则表达式匹配指定的对象(queue或者exchange),然后可以在这些queue或者exchange上应用我们的策略设置。管理平台相关policy的设置页面如下:
镜像队列
在集群环境中,队列只有元数据会在集群的所有节点同步,但队列中的数据只会存在于一个节点,数据没有冗余且容易丢,甚至在durable的情况下,如果所在的服务器节点宕机,就要等待节点恢复才能继续提供消息服务。
NQS利用RabbiMQ提供的镜像队列,高可用模式下队列数据将在所有节点同步并落盘,因此任一一个节点宕机将不影响服务的可用性。镜像队列也有主从之分,正常情况下只有master才对外服务,slave节点只提供备份服务。同样的,我们也可以通过policy来进行镜像队列的设置(上图的HA选项)。
Alternate Exchange(AE)
当一条消息无法被一个exchange路由到一个queue时,默认情况下消息将被丢弃。某些情况,需要知道被rabbitmq丢弃的消息有哪些,如果定义了AE, 消息将被路由到AE绑定的queue当中。
注册流控事件
当消费者的消费速度小于生产者的生成速度时,会出现消息堆积的情况,此时的性能将会出现大幅度下降,甚至还可能会出现流控,发生流控时的RabbitMQ将会停止生产。很多时候我们需要知道何时发生了流控事件,除了NQS提供的事件报警机制外,利用客户端本身提供的功能也能获得该通知:
ConnectionFactory factory = new ConnectionFactory();
Connection connection = factory.newConnection();
connection.addBlockedListener(new BlockedListener() {
public void handleBlocked(String reason) throws IOException {
// Connection is now blocked
}
public void handleUnblocked() throws IOException {
// Connection is now unblocked
}
});