博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Rabbitmq_02 Work Queues
阅读量:5260 次
发布时间:2019-06-14

本文共 2387 字,大约阅读时间需要 7 分钟。

  上篇博客讲述了最简单的Rabbitmq使用,一个发送者对应一个接收者。

  但Rabbitmq没有规定一个队列的接受者的数量,意味着可以出现一个发送者多个接收者的情况。

  就像任务队列,一个进程生成任务并放到队列中,多个进程从队列中读取任务并执行。

 

发送者

#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel() # 注意声明中的durable参数channel.queue_declare(queue='task_queue', durable=True)message = ' '.join(sys.argv[1:]) or "Hello World!"# 注意发送消息时的第四个参数。channel.basic_publish(exchange='',                      routing_key='task_queue',                      body=message,                      properties=pika.BasicProperties(                         delivery_mode = 2, # make message persistent                      ))print(" [x] Sent %r" % message)connection.close()

 

 

接收者

#!/usr/bin/env pythonimport pikaimport timeconnection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    time.sleep(body.count(b'.'))    print(" [x] Done")    # 打开ack标志之后,一定要记住发送ack    ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_qos(prefetch_count=1)channel.basic_consume(callback,                      queue='task_queue')channel.start_consuming()

  

分配模式

  如果队列有多个接收者,那么消息如何在接收者之间进行分配?

  默认情况下采用Round-robin模式,意思就是在所有的接收者之间进行轮寻,依次分配。

  比如队列有两个接收者a,b;如果向队列中发送五条消息12345,第一条分配到a,第二条分配到b,第三条a,如此类推。最终a接收135, b接收24。

 

  另一中分配模式更注重“公平”,在上面的分配方式中,如果奇数消息处理时耗时长,偶数消息耗时短。两个接收者,则第一个接收者将一直接收繁重的任务。

  此时如果指定channel.basic_qos(prefetch_count=1)参数,则在接收到接收消息返回ack之前,不会分配给该接收者消息,往后寻找现在空闲的接收者。

 

消息确认

  仔细对比接收者的basic_consume方法和上一篇博客的区别,发现没有第三个。

  就是说默认情况下,采用消息确认机制。即消息被读取后,并不会立即删除,而是等待接收者返回ack。确保该消息被正确接收之后,再删除。如果Rabbitmq判断某个接收者失联,会将之前发送给该进程且没有收到返回ack的消息重新发送给其他进程。

  等待确认没有超时机制,即如果接收者一直存活,但总不回复ack,消息将会一直存在于内存中。因此一定要记住,默认模式下,接收者收到消息之后,调用basic_ack(delivery_tag = method.delivery_tag)方法返回ack。

  sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged命令会查看目前的队列,以及队列中已确认和未确认的消息数。

 

Rabbitmq持久化

  持久化分为两部分,对队列持久和对消息持久。

  channel.queue_declare(queue='task_queue', durable=True)中的durable参数指明了队列持久化,注意接收者中也要进行一样的声明。即使Rabbitmq重启,task_queue队列依然存在。

  消息的持久,请看发送者发送时的第四个参数,他指明了Rabbitmq发送时,会将消息同步到硬盘。注意这种同步并不是每次都真的同步到硬盘,只是将消息放到了系统缓冲,由系统决定何时将缓冲内容刷盘。

 

转载于:https://www.cnblogs.com/lovelaker007/p/10084472.html

你可能感兴趣的文章
android:scaleType属性
查看>>
SuperEPC
查看>>
mysql-5.7 innodb 的并行任务调度详解
查看>>
shell脚本
查看>>
Upload Image to .NET Core 2.1 API
查看>>
Js时间处理
查看>>
Java项目xml相关配置
查看>>
三维变换概述
查看>>
vue route 跳转
查看>>
【雷电】源代码分析(二)-- 进入游戏攻击
查看>>
Entityframework:“System.Data.Entity.Internal.AppConfig”的类型初始值设定项引发异常。...
查看>>
Linux中防火墙centos
查看>>
mysql新建用户,用户授权,删除用户,修改密码
查看>>
FancyCoverFlow
查看>>
JS博客
查看>>
如何设置映射网络驱动器的具体步骤和方法
查看>>
ASP.NET WebApi 基于OAuth2.0实现Token签名认证
查看>>
283. Move Zeroes把零放在最后面
查看>>
Visual Studio Code 打开.py代码报Linter pylint is not installed解决办法
查看>>
Python 数据类型
查看>>