Skip to main content

快速开始

安装

本文通过 Docker 启动 RabbitMQ 服务。

首先拉取镜像 rabbitmq:management,此镜像包含图形化管理界面。

拉取镜像
docker pull rabbitmq:management

然后启动服务,访问 localhost:15672 即可进入登录页面。

启动服务
docker run \
-itd \
--name rmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
-p 15672:15672 \
-p 5672:5672 \
rabbitmq:management

此外,本文采用 Pika(Python)进行代码演示。因此需要安装对应三方包。

pip安装
pip3 install pika

快速开始

通过 Pika 模块,我们可以快速初始化一个应用所需要的基础模块,主要包括:

  • 连接:物理上的连接
  • 信道:逻辑上的连接
  • 消息队列:存放消息的队列
应用初始化
import pika
# 初始化连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 初始化信道
channel = connection.channel()
# 初始化队列
channel.queue_declare(queue='hello')
# 初始化预取数量
channel.basic_qos(prefetch_count=1)

在初始化之后,我们可以通过信道简单又快速的发送一条消息。其中一条消息中最基础的属性主要包括:

  • 交换机:不同类型的交换机拥有不同的转发能力,常和路由键组合使用,用于确定一条消息应该被推送到那个队列。
  • 路由键:-
  • 数据:-
发送消息
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE,
)
)

在消息存放到消息队列后,我们可以通过 basic_consume 指定消费数据。

接收消息
def callback_handler(ch, method, properties, body):
print(" [x] Received %r" % body)

channel.basic_consume(
queue='hello',
auto_ack=True,
on_message_callback=callback_handler,
)

RPC

通过 RabbitMQ 来实现 RPC 功能,其本质还是通过生产者/消费者模式实现的。 大致流程是:

  • 生产者
  1. 创建随机唯一的排他队列
  2. 推送消息时携带上此排他队列的信息
  3. 等待回调
  • 消费者
  1. 接收消息
  2. 处理消息
  3. 将结果推送到回调队列中

代码参考如下

生产者模块
import pika
import uuid

class FibonacciRpcClient(object):

def __init__(self):
self.connection = \
pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))

self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True) # 队列名为空
self.callback_queue = result.method.queue

self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True,
)

self.response = None
self.corr_id = None

def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body

def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n),
)
self.connection.process_data_events(time_limit=None)
return int(self.response)


fibonacci_rpc = FibonacciRpcClient()
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
消费者模块
import pika
# 应用初始化
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
channel.basic_qos(prefetch_count=1)

def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)

# 异步处理函数
def callback_handler(ch, method, props, body):
n = int(body)
response = fib(n)

ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id = props.correlation_id,
),
body=str(response),
)
ch.basic_ack(delivery_tag=method.delivery_tag)


channel.basic_consume(
queue='rpc_queue',
on_message_callback=callback_handler,
)
channel.start_consuming()