消息确认
在 RabbitMQ 中,消息确认机制用于确保消息能够被正确的处理,避免在某些异常场景下的消息丢失。
常见的消息确认机制的实现模式有:
- Confirm(发布确认)模式:常用于生产端
- Ack(消费确认)模式:常用于消费端
Confirm
使用 Confirm 模式可以实现以下目标:
- 生产者发送消息后,等待服务器确认消息已成功接收,确保消息成功发送到 RabbitMQ。
- 如果消息发送失败,可以通过回调机制或等待超时来处理发送失败的情况,例如重新发送消息或记录日志。
- 可以确保消息到达服务器后的状态,避免丢失或重复发送消息。
实现 Confirm 模式的步骤如下: 下面通过 pika 简单演示下 confirm 模式的实现。
首先将信道设置为 Confirm 模式
设置 Confirm 模式
channel.confirm_select()
当信道处于 Confirm 模式时,通过调用 basic_publish()
方法发布消息时,
会返回一个布尔值,表示消息是否成功发送到 RabbitMQ 服务器。
发布消息并等待确认
if channel.basic_publish(
exchange='',
routing_key='my_routing_key',
body='test',
):
print("消息发送成功")
else:
print("消息发送失败")
除了上述方法之外,还可以为信道添加 Confirm 的回调处理函数,以便在消息确认时或发生错误时得到通知。
添加 Confirm 的回调函数
def confirm_callback(method_frame):
if method_frame.method.NAME == 'Basic.Ack':
print("消息已确认,delivery_tag:", method_frame.delivery_tag)
elif method_frame.method.NAME == 'Basic.Nack':
print("消息未确认,delivery_tag:", method_frame.delivery_tag)
elif method_frame.method.NAME == 'Basic.Return':
print("消息被退回,delivery_tag:", method_frame.delivery_tag)
channel.add_confirm_listener(confirm_callback)