上篇博客讲述了最简单的Rabbitmq使用,一个发送者对应一个接收者。
但Rabbitmq没有规定一个队列的接受者的数量,意味着可以出现一个发送者多个接收者的情况。
就像任务队列,一个进程生成任务并放到队列中,多个进程从队列中读取任务并执行。
发送者
#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel() # 注意声明中的durable参数channel.queue_declare(queue='task_queue', durable=True)message = ' '.join(sys.argv[1:]) or "Hello World!"# 注意发送消息时的第四个参数。channel.basic_publish(exchange='', routing_key='task_queue', body=message, properties=pika.BasicProperties( delivery_mode = 2, # make message persistent ))print(" [x] Sent %r" % message)connection.close()
接收者
#!/usr/bin/env pythonimport pikaimport timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body): print(" [x] Received %r" % body) time.sleep(body.count(b'.')) print(" [x] Done") # 打开ack标志之后,一定要记住发送ack ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(callback, queue='task_queue')channel.start_consuming()
分配模式
如果队列有多个接收者,那么消息如何在接收者之间进行分配?
默认情况下采用Round-robin模式,意思就是在所有的接收者之间进行轮寻,依次分配。
比如队列有两个接收者a,b;如果向队列中发送五条消息12345,第一条分配到a,第二条分配到b,第三条a,如此类推。最终a接收135, b接收24。
另一中分配模式更注重“公平”,在上面的分配方式中,如果奇数消息处理时耗时长,偶数消息耗时短。两个接收者,则第一个接收者将一直接收繁重的任务。
此时如果指定channel.basic_qos(prefetch_count=1)参数,则在接收到接收消息返回ack之前,不会分配给该接收者消息,往后寻找现在空闲的接收者。
消息确认
仔细对比接收者的basic_consume方法和上一篇博客的区别,发现没有第三个。
就是说默认情况下,采用消息确认机制。即消息被读取后,并不会立即删除,而是等待接收者返回ack。确保该消息被正确接收之后,再删除。如果Rabbitmq判断某个接收者失联,会将之前发送给该进程且没有收到返回ack的消息重新发送给其他进程。
等待确认没有超时机制,即如果接收者一直存活,但总不回复ack,消息将会一直存在于内存中。因此一定要记住,默认模式下,接收者收到消息之后,调用basic_ack(delivery_tag = method.delivery_tag)方法返回ack。
sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged命令会查看目前的队列,以及队列中已确认和未确认的消息数。
Rabbitmq持久化
持久化分为两部分,对队列持久和对消息持久。
channel.queue_declare(queue='task_queue', durable=True)中的durable参数指明了队列持久化,注意接收者中也要进行一样的声明。即使Rabbitmq重启,task_queue队列依然存在。
消息的持久,请看发送者发送时的第四个参数,他指明了Rabbitmq发送时,会将消息同步到硬盘。注意这种同步并不是每次都真的同步到硬盘,只是将消息放到了系统缓冲,由系统决定何时将缓冲内容刷盘。