使用介绍 -- Python

Attention

出于安全考虑,我们不提供公网 IP 来连接 RabbitMQ,因此如果要在本地连接调试需要先参考文档如何使用网易云 OpenVPN ,根据自己的实例所在可用区使用或自行搭建 VPN 连接或直接通过与实例同可用区或同 VPC 的云服务器使用 RabbitMQ

本文通过 hello world 的示例描述消息队列的具体使用方法,开发语言为 python,版本 2.7。示例分为生产者和消费者,对应的文件分别为 hello_world_producer.py 和 hello_world_consumer.py。运行环境 Centos7.2。

创建消息队列实例

运行生产者和消费者之前需要新建一个消息队列的实例,本文中采用单实例的部署方式。 消息队列实例的具体新建方法参见文档创建实例

编写生产者、消费者代码

由于生产者和消费者代码中需要用到 python 模块 pika,因此需要提前安装好此模块,可以使用 pip 直接安装,也可下载模块后进行安装。pip 安装方法如下:

yum install python-pip -y
pip install pika

之后书写生产者和消费者代码, 生产者文件 hello_world_producer.py 代码如下:

#!/usr/bin/env python
# -*- coding: UTF-8 -*-
import pika
import sys 
#用户信息认证,填写RabbitMQ账号、密码, 在[账户与密码]中可查看。
credentials = pika.PlainCredentials("username", "password")
#配置为RabbitMQ地址
conn_params = pika.ConnectionParameters("IP", credentials = credentials)
#建立到服务器的连接
conn_broker = pika.BlockingConnection(conn_params)
#获得信道
channel = conn_broker.channel()
#声明交换器
channel.exchange_declare(exchange="hello-exchange", type = "direct", passive=False, durable = True, auto_delete = False)
msg = sys.argv[1]
msg_props = pika.BasicProperties()
#纯文本消息
msg_props.content_type = "text/plain"
#发布消息
channel.basic_publish(body = msg, exchange = "hello-exchange", properties = msg_props, routing_key = "hola")

消费者文件 hello_world_consumer.py 代码如下:

#!/user/bin/env python
# -*- coding: UTF-8 -*- 
import pika
#用户信息认证,填写RabbitMQ账号、密码, 在[账户与密码]中可查看。。
credentials = pika.PlainCredentials("username", "password")
#配置为RabbitMQ地址         
conn_params = pika.ConnectionParameters("IP", credentials = credentials)
#建立到服务器的连接
conn_broker = pika.BlockingConnection(conn_params)
#获得信道
channel = conn_broker.channel()
#声明交换器
channel.exchange_declare(exchange = "hello-exchange", type = "direct", passive = False, durable = True, auto_delete = False)
#声明队列
channel.queue_declare(queue = "hello-queue")
#队列交换器绑定
channel.queue_bind(queue = "hello-queue", exchange = "hello-exchange", routing_key = "hola")
def msg_consumer(channel, method, header, body):
  "消息处理函数"
  #消息确认
  channel.basic_ack(delivery_tag = method.delivery_tag)
  if body == "quit":
    #停止消费并退出
    channel.basic_cancel(consumer_tag = "hello-consumer")
    channel.stop_consuming()
  else:
    print body
#订阅
channel.basic_consume(msg_consumer, queue = "hello-queue", consumer_tag = "hello-consumer")
#消费
channel.start_consuming()

运行代码

首先运行消费者:

python hello_world_consumer.py

在另一个窗口中运行生产者,并发送消息 "hello world":

python hello_world_producer.py "hello world"

此时消费者可以接收到消息 "hello world", 如图:

生产者发送消息 "quit" 后消费者退出。