快速开始
安装
本文通过 Docker 启动 RabbitMQ 服务。
首先拉取镜像 rabbitmq:management
,此镜像包含图形化管理界面。
拉取镜像
docker pull rabbitmq:management
然后启动服务,访问 localhost:15672
即可进入登录页面。
启动服务
docker run \
-itd \
--name rmq \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin \
-p 15672:15672 \
-p 5672:5672 \
rabbitmq:management
此外,本文采用 Pika(Python)进行代码演示。因此需要安装对应三方包。
pip安装
pip3 install pika
快速开始
通过 Pika 模块,我们可以快速初始化一个应用所需要的基础模块,主要包括:
- 连接:物理上的连接
- 信道:逻辑上的连接
- 消息队列:存放消息的队列
应用初始化
import pika
# 初始化连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
# 初始化信道
channel = connection.channel()
# 初始化队列
channel.queue_declare(queue='hello')
# 初始化预取数量
channel.basic_qos(prefetch_count=1)
在初始化之后,我们可以通过信道简单又快速的发送一条消息。其中一条消息中最基础的属性主要包括:
- 交换机:不同类型的交换机拥有不同的转发能力,常和路由键组合使用,用于确定一条消息应该被推送到那个队列。
- 路由键:-
- 数据:-
发送消息
channel.basic_publish(
exchange='',
routing_key='hello',
body='Hello World!',
properties=pika.BasicProperties(
delivery_mode = pika.spec.PERSISTENT_DELIVERY_MODE,
)
)
在消息存放到消息队列后,我们可以通过 basic_consume
指定消费数据。
接收消息
def callback_handler(ch, method, properties, body):
print(" [x] Received %r" % body)
channel.basic_consume(
queue='hello',
auto_ack=True,
on_message_callback=callback_handler,
)
RPC
通过 RabbitMQ 来实现 RPC 功能,其本质还是通过生产者/消费者模式实现的。 大致流程是:
- 生产者
- 创建随机唯一的排他队列
- 推送消息时携带上此排他队列的信息
- 等待回调
- 消费者
- 接收消息
- 处理消息
- 将结果推送到回调队列中
代码参考如下
生产者模块
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.connection = \
pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True) # 队列名为空
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True,
)
self.response = None
self.corr_id = None
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n),
)
self.connection.process_data_events(time_limit=None)
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
response = fibonacci_rpc.call(30)
print(" [.] Got %r" % response)
消费者模块
import pika
# 应用初始化
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
channel.basic_qos(prefetch_count=1)
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
# 异步处理函数
def callback_handler(ch, method, props, body):
n = int(body)
response = fib(n)
ch.basic_publish(
exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(
correlation_id = props.correlation_id,
),
body=str(response),
)
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_consume(
queue='rpc_queue',
on_message_callback=callback_handler,
)
channel.start_consuming()