RabbitMQ是实现AMQP(高级消息队列协议)的消息中间件的一种,最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。和普通的queue比较起来,生产和消费不再针对内存里的一个Queue对象,而是某台服务器上的RabbitMQ Server实现的消息队列。

RabbitMQ主要是为了实现系统之间的双向解耦而实现的。当生产者大量产生数据时,消费者无法快速消费,那么需要一个中间层。保存这个数据。

首先来他需要安装erlang语言包和rabbitmq-server,启动服务,然后打开端口5672

服务器(CentOS7)

yum install erlangyum install rabbitmq-serversystemctl start rabbitmq-serversystemctl status rabbitmq-serverfirewall-cmd --add-port=5672/tcp --permanentsystemctl restart firewalld

Python客户端(windows),安装pika模块

C:\WINDOWS\system32>pip install pikaCollecting pika  Downloading pika-0.10.0-py2.py3-none-any.whl (92kB)    100% |################################| 102kB 632kB/sInstalling collected packages: pikaSuccessfully installed pika-0.10.0

现在看看Python里面如何使用:

例1 Hello World

发送

import pika# ######################### 生产者 ##########################绑定到一个broker上面connection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()#创建一个queue用来传输信息channel.queue_declare(queue='hello1')#RabbitMQ不可以直接给queue发送信息,必须通过exchange,这里空字符串表示默认的exchangechannel.basic_publish(exchange='',                      routing_key='hello1',                      body='Hello World!')print(" [x] Sent 'Hello World!'")#清空queue,断开连接connection.close()

接收

#!/usr/bin/env pythonimport pika# ########################## 消费者 ##########################connection = pika.BlockingConnection(pika.ConnectionParameters(    host='sydnagios'))channel = connection.channel()# 和生产者一样,这里也需要定义一个queue,这是因为我们不知道到底是生产者和消费者谁先执行;这个queue即使多次定义也只会创建一个对象channel.queue_declare(queue='hello1')# 每当接收到一个信息,pika库会自动调用callback函数def callback(ch, method, properties, body):    print(" [x] Received %r" % body)# 指定callback从哪个queue获取信息    channel.basic_consume(callback,                      queue='hello1',                      no_ack=True)print(' [*] Waiting for messages. To exit press CTRL+C')# 死循环,不停阻塞接收信息channel.start_consuming()

如果在RabbitMQ的服务器上执行以下操作,可以看见queue里面有几个信息

比如发送者发送了2条信息之后,可以看见hello1数目变成2,如果我用客户端去取了2次,那么他又会变成0

[root@sydnagios nagvis]# sudo rabbitmqctl list_queuesListing queues ...hello   0hello1  2kakaxi1 0...done.

例2, 工作队列和可靠性

例1里面我们通过一个queue发送和接受了信息;我们也可以创建一个工作队列(Task Queue)来给多个客户端发送信息,这种模型适合于那种特别耗时的任务;感觉这个和之前线程池的方式类似,所有的任务放在queue里面,然后每个线程(客户端)不停地去取任务执行。

在默认情况下,任务的分发是通过round-robin(轮换)的方式实现的,比如C1接受任务1,C2任务2,C1任务3,C2任务4...这样的缺点是如果任务的耗时不同,可能C1一直在执行一堆繁重的任务,而C2分到的都是轻量级的任务,一直很空闲。我们可以通过指定channel.basic_qos(prefetch_count=1)来实现公平分发,换句话说消息只会分发给空闲的客户端。

RabbitMQ里面有3种方式来确保消息的可靠性。

第一个方式是在消费者方面进行ACK的确认,每次成功接收消息之后发送确认信号,如果意外中止了,RabbitMQ会把任务重新放入队列中,然后发给下一个客户端,比如C1如果刚刚收到任务1就挂了,因为C1没有确认,那么RabitMQ会把任务1重新发给C2;确认是通过下面的代码实现的

import pika# ########################## 消费者 ##########################connection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()channel.queue_declare(queue='hello1')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    import time    time.sleep(10)    print ('ok')    ch.basic_ack(delivery_tag = method.delivery_tag)channel.basic_consume(callback,           queue='hello1',           no_ack=False)print(' [*] Waiting for messages. To exit press CTRL+C')channel.start_consuming()--------- [*] Waiting for messages. To exit press CTRL+C [x] Received b'Hello World!'ok

注意no_ack默认是Fasle,因此可以不写

ch.basic_ack如果忘记写了,后果会很严重,客户端掉线的时候,RabbitMQ会转发消息给下一个客户,但是他不会释放掉没有被ACK的消息,这样内存不被不断的吃掉。

可以通过下面命令进行debug

[root@sydnagios nagvis]# rabbitmqctl list_queues name messages_ready messages_unacknowledgedListing queues ...hello   0       0hello1  0       0kakaxi1 0       0task_queue      0       0

第二种方式是确保queue不会丢失,注意这种方式对已经创建过的queue无效!

注意客户端和服务器端申明的时候都要指定

channel.queue_declare(queue='hello', durable=True)

第三种可靠性的方法是消息的持久化,针对生产者指定delivery_mode=2,这样即使生产者那边挂了,生产者那边会重新把任务放入队列。 

channel.basic_publish(exchange='',                      routing_key='hello',                      body='Hello World!',                      properties=pika.BasicProperties(                          delivery_mode=2, # make message persistent                      ))

最后来个完整的例子

生产者

#!/usr/bin/env python# -*- coding:utf-8 -*-# Author Yuan Li#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()#queue durablechannel.queue_declare(queue='task_queue', durable=True)message = ' '.join(sys.argv[1:]) or "Hello World!"channel.basic_publish(exchange='',                      routing_key='task_queue',                      body=message,                      properties=pika.BasicProperties(                         delivery_mode = 2, # make message persistent                      ))print(" [x] Sent %r" % message)connection.close()

消费者

#!/usr/bin/env python# -*- coding:utf-8 -*-# Author Yuan Li#!/usr/bin/env pythonimport pikaimport timeconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()channel.queue_declare(queue='task_queue', durable=True)print(' [*] Waiting for messages. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] Received %r" % body)    time.sleep(body.count(b'.'))    print(" [x] Done")    #千万别忘了,不然不会释放内存    ch.basic_ack(delivery_tag = method.delivery_tag)    #按任务繁忙分配,而不是顺序分配channel.basic_qos(prefetch_count=1)channel.basic_consume(callback,                      queue='task_queue')channel.start_consuming()

例3 发布订阅

RabbitMQ可以通过一个Exchange来同时给多个Queue发送消息,一般情况下,P(发布者)并不知道信息应该发给哪个queue,这些都是有Exchange的类型决定的。Exchange有3种常用类型

  • fanout 转发消息到所有的绑定队列

  • direct 通过一个关键字(routingkey)匹配转发消息到指定的队列

  • topic 模糊匹配转发消息到指定的队列

  • header

可以看见exchange的列表

[root@sydnagios nagvis]# rabbitmqctl list_exchangesListing exchanges ...        directamq.direct      directamq.fanout      fanoutamq.headers     headersamq.match       headersamq.rabbitmq.log        topicamq.rabbitmq.trace      topicamq.topic       topic...done.

Fanout类型

消费者

# Author:Alex Li#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()channel.exchange_declare(exchange='logs_fanout',                         type='fanout')# 随机创建队列,exclusive=True表示当我们断开和消费者的连接,这个queue会自动删除result = channel.queue_declare(exclusive=True)queue_name = result.method.queue# 绑定channel.queue_bind(exchange='logs_fanout',                   queue=queue_name)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r" % body)channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()

生产者

# Author:Alex Li#!/usr/bin/env pythonimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()channel.exchange_declare(exchange='logs_fanout',                         type='fanout')message = '456'#注意在fanout模式里面,routing_key为空channel.basic_publish(exchange='logs_fanout',                      routing_key='',                      body=message)print(" [x] Sent %r" % message)connection.close()

Direct类型,消息发送给和他自己的routing key同名binding key的队列,多个队列可以使用同一个binding key

direct可以指定关键字来绑定queue,比如第一个客户循环地绑定了error,info,warning3个关键字所在的queue

第二个客户只绑定了error

客户1

# Author:Alex Liimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs_test_1',                         type='direct')result = channel.queue_declare(exclusive=True)queue_name = result.method.queueseverities = ['error', 'info', 'warning']for severity in severities:    channel.queue_bind(exchange='direct_logs_test_1',                       queue=queue_name,                       routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()

客户2

# Author:Alex Liimport pikaimport sysconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs_test_1',                         type='direct')result = channel.queue_declare(exclusive=True)queue_name = result.method.queueseverities = ['error',]for severity in severities:    channel.queue_bind(exchange='direct_logs_test_1',                       queue=queue_name,                       routing_key=severity)print(' [*] Waiting for logs. To exit press CTRL+C')def callback(ch, method, properties, body):    print(" [x] %r:%r" % (method.routing_key, body))channel.basic_consume(callback,                      queue=queue_name,                      no_ack=True)channel.start_consuming()

生产者可以指定给哪些routingkey的queue发送信息,比如只发给info,那么只有客户2收到;如果发给error,那么两个客户都能收到

# Author:Alex Li#!/usr/bin/env pythonimport pikaconnection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()channel.exchange_declare(exchange='direct_logs_test_1',                         type='direct')severity = 'info'message = '456'channel.basic_publish(exchange='direct_logs_test_1',                      routing_key=severity,                      body=message)print(" [x] Sent %r:%r" % (severity, message))connection.close()

在topic类型下,可以让队列绑定几个模糊的关键字,之后发送者将数据发送到exchange,exchange将传入”路由值“和 ”关键字“进行匹配,匹配成功,则将数据发送到指定队列。

  • # 表示可以匹配 0 个 或 多个 单词

  • *  表示只能匹配 一个 单词

1
2
3
发送者路由值              队列中
old.boy.python          old.
*  
-
- 
不匹配
old.boy.python          old.
#  -- 匹配

最后我们来看看RPC

简单的说就是在远程电脑执行命令然后返回结果

基本思路:

客户端发送请求(包括请求的correlation_id和reply_to队列),服务器端收到之后执行命令,返回结果到reply_to的队列里面,然后客户端从reply_to 队列读取数据

例如

result = channel.queue_declare(exclusive=True)callback_queue = result.method.queuechannel.basic_publish(exchange='',                      routing_key='rpc_queue',                      properties=pika.BasicProperties(                            reply_to = callback_queue,                            ),                      body=request)

注意properties属性里面有14个预定义的值,其中4个最为常见:

  • delivery_mode: 消息的持久化(2)

  • content_type:用于mime-type的编码,一般Json使用application/json

  • reply_to:callback的队列

  • correlation_id:关联RPC反馈信息和请求命令,每个请求都需要有唯一的值

演示源码

服务器端(需要先接受信息,再发布信息回去)

#!/usr/bin/env python# -*- coding:utf-8 -*-# Author Yuan Li#!/usr/bin/env pythonimport pika#绑定broker,创建一个队列connection = pika.BlockingConnection(pika.ConnectionParameters(        host='sydnagios'))channel = connection.channel()channel.queue_declare(queue='rpc_queue')#定义一个斐波拉契数列作为测试def fib(n):    if n == 0:        return 0    elif n == 1:        return 1    else:        return fib(n-1) + fib(n-2)        #定义一个回调函数给basic_consume使用        def on_request(ch, method, props, body):    n = int(body)    print(" [.] fib(%s)" % n)    response = fib(n)        #把结果发布回去    ch.basic_publish(exchange='',                     routing_key=props.reply_to,                     properties=pika.BasicProperties(correlation_id = \                                                         props.correlation_id),                     body=str(response))                         ch.basic_ack(delivery_tag = method.delivery_tag)#负载平衡channel.basic_qos(prefetch_count=1)#接受请求之后,自动调用on_request,内部执行函数,然后发回结果channel.basic_consume(on_request, queue='rpc_queue')print(" [x] Awaiting RPC requests")channel.start_consuming()

发布者

#!/usr/bin/env python# -*- coding:utf-8 -*-# Author Yuan Li#!/usr/bin/env pythonimport pikaimport uuidclass FibonacciRpcClient(object):    def __init__(self):            #绑定        self.connection = pika.BlockingConnection(pika.ConnectionParameters(                host='sydnagios'))        self.channel = self.connection.channel()                #生成随机队列        result = self.channel.queue_declare(exclusive=True)        self.callback_queue = result.method.queue                #指定on_response从callback_queue读取信息,阻塞状态        self.channel.basic_consume(self.on_response, no_ack=True,                                   queue=self.callback_queue)                                       #接受返回的信息    def on_response(self, ch, method, props, body):        if self.corr_id == props.correlation_id:            self.response = body                #发送请求    def call(self, n):        self.response = None                #生成一个随机值        self.corr_id = str(uuid.uuid4())                #发送两个参数 reply_to和 correlation_id        self.channel.basic_publish(exchange='',                                   routing_key='rpc_queue',                                   properties=pika.BasicProperties(                                         reply_to = self.callback_queue,                                         correlation_id = self.corr_id,                                         ),                                   body=str(n))                #等待接受返回结果        while self.response is None:            self.connection.process_data_events()        return int(self.response)        #实例化对象fibonacci_rpc = FibonacciRpcClient()print(" [x] Requesting fib(30)")#调用call,发送数据response = fibonacci_rpc.call(30)print(" [.] Got %r" % response)

结果如下:

"C:\Program Files\Python3\python.exe" C:/Users/yli/pycharmprojects/Exercise/Week11/c.py [x] Requesting fib(30) [.] Got 832040

参考资料: