English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

pythonのキュー通信:rabbitMQの使用(実例解説)

(一)、前語

メッセージキューを導入する理由は何ですか?

1.プログラムの解耦

2.パフォーマンスの向上

3.多業務ロジックの複雑さを低減

(二)、pythonでrabbit mqの操作

rabbitmqの設定、インストール、基本的な使用法は前節の記事を参照してください。再述しません。

pythonでrabbitmqを使用するには、pikaモジュールをインストールする必要があります。直接pipでインストールしてください:

pip install pika

1.最も簡単なrabbitmq プロダクタとコンサーマーの対話:

producer:

#Author :ywq
import pika
auth = pika.PlainCredentials('ywq', 'qwe') #save auth info
connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #连接到Rabbit
channel = connection.channel() #create channel
channel.queue_declare(queue='hello') #声明队列
#n RabbitMQ 消息不能直接发送到队列,它总是需要通过交换机。
channel.basic_publish(exchange='',
   routing_key='hello',
   body='Hello World!') #消息内容是消息体
print(" [x] 发送 'Hello World!'")
connection.close()

consumer:

#Author :ywq
import pika

connection = pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth)) #连接到Rabbit
channel = connection.channel()  #创建通道
channel.queue_declare(queue='hello') #声明队列
def callback(ch, method, properties, body):
 print(" [x] 收到 %r" % body)
channel.basic_consume(callback,
   queue='hello',
   no_ack=True)
print(' [*] 等待消息。要退出请按CTRL+C')
channel.start_consuming()

在消息传递消费过程中,可以在Rabbit Web管理页面实时查看队列消息信息。

2.持久化的消息队列,避免因宕机等意外情况导致消息队列丢失。

消费者端无需更改,在生产者端代码中添加两个属性,分别使消息持久化、队列持久化,只选择其一仍会出现消息丢失,必须同时开启:

delivery_mode=2 # make msg persistent
durable=True

属性插入位置见如下代码(生产者端):

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
  '192.168.0.158',5672,'/',auth_info
 ))
channel=connection.channel()
channel.queue_declare(queue='test1#durable=True, 持久化队列
msg=''.join(sys.argv[1:]) or 'Hello'
channel.basic_publish(
 exchange='',
 routing_key='test1',
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2 # make msg persistent
 )
)
print('Send done:',msg)
connection.close()

3.公平配信

複数のconsumerがある場合、デフォルトでRabbitMQはメッセージをループで送信しますが、あるconsumerは速く、あるconsumerは遅く、リソースの使用がよりバランスが取れるように、ack確認メカニズムを導入します。consumerがメッセージを消費した後、RabbitMQにackを送信します。未ackのメッセージの数が指定された許容数を超えた場合、そのconsumerにメッセージを送信するのを停止し、他のconsumerに送信します。

producer端のコードは変更しないでください。consumer端のコードに2つの属性を挿入する必要があります:

channel.basic_qos(prefetch_count= *) # define the max non_ack_count
channel.basic_ack(delivery_tag=deliver.delivery_tag) # send ack to rabbitmq

属性の挿入位置は以下のコード(consumer端)で確認してください:

#Author :ywq
import pika,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.queue_declare(queue='test2',durable=True)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 time.sleep(5)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit
channel.basic_qos(prefetch_count=1)
'''
注意、no_ack=False 注意、ここでのno_ackの種類は、RabbitMQにこのconsumerのqueueがackを返すかどうかを伝えるためのものであり、ackを返すにはcallback内で定義する必要があります。
prefetch_count=1,未ackのmsgの数が1個の場合、このconsumerはmsgを受け取らなくなり、この設定はchannel.basic_consumeの上に書かれる必要があります。さもなければnon_ack状況が発生します。
'''
channel.basic_consume(
 callback,
 queue='test2'
)
channel.start_consuming()

第3章 メッセージの配信/購読

上記のパターンは、producerが一度送信するとconsumerが一度受け取る構造ですが、producerが一度送信して、複数の関連するconsumerが同時に受け取るようにできますか?もちろんです、RabbitMQはメッセージの配信と購読をサポートしており、3つのモードをサポートしています。これらはコンポーネントexchange経由で転送されます。3以下の種類のパターン:

fanout: このexchangeにバインドされたすべてのqueueがメッセージを受け取れる、ブロードキャストに似ています。

direct: メッセージを受け取れるユニークなqueueはroutingKeyとexchangeによって決定され、そのqueueにバインドされたconsumerにメッセージを送信し、マルチキャストに似ています。

topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息,类似前缀列表匹配路由。

1.fanout

publish端(producer):

#Author :ywq
import pika,sys,time
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='hello',
    exchange_type='fanout'
    )
msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time()
channel.basic_publish(
 exchange='hello',
 routing_key='',
 body=msg,
 properties=pika.BasicProperties(
 delivery_mode=2
 )
)
print('send done')
connection.close()

subscribe端(consumer):

#Author :ywq
import pika
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(
 exchange='hello',
 exchange_type='fanout'
)
random_num=channel.queue_declare(exclusive=True) #随机与rabbit建立一个queue,comsumer断开后,该queue立即删除释放
queue_name=random_num.method.queue
channel.basic_qos(prefetch_count=1)
channel.queue_bind(
 queue=queue_name,
 exchange='hello'
)
def callback(chann,deliver,properties,body):
 print('Recv:',body)
 chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit
channel.basic_consume(
 callback,
 queue=queue_name,
)
channel.start_consuming()

实现producer一次发送,多个关联consumer接收。

使用exchange模式时:

1.producer端不再申明queue,直接申明exchange

2.consumer端仍需绑定队列并指定exchange来接收message

3.consumer最好创建随机queue,使用完后立即释放。

随机队列名在web下可以检测到:

2.direct

使用exchange同时consumer有选择性地接收消息。队列绑定关键字,producer将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列,consumer相应接收。即在fanout基础上增加了routing key。

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='direct_log',
   exchange_type='direct',
   )
while True:
 route_key=input('Input routing key:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='direct_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='direct_log',
 exchange_type='direct',
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')
channel.queue_bind(
 queue=queue_name,
 exchange='direct_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[level:%s],[msg:%s]' %(route_key,body))
 chann.basic_ack(delivery_tag=deliver.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

複数のconsumerを同時に開始し、そのうち2つがnoticeを受け取り、2つがwarningを受け取り、以下のようになります:

3.topic

directと比較して、topicは(consumer側で指定する)模糊一致の動作方式を実現できます。routing keyに指定されたキーワードが含まれていれば、そのmsgをバインドされたqueueに送信します。

rabbitmqのワイルドカードルール:

シンボル「#」は1つ以上の単語を一致させます、シンボル「」は1つの単語を一致させます。したがって「abc.#」は「abc.m.n」に一致しますが、「abc.*‘' だけが「abc.m」に一致します。‘.' は区切り記号です。ワイルドカードで一致させる場合、‘.' を区切り記号に使用する必要があります。

producer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
 )
)
channel=connection.channel()
channel.exchange_declare(exchange='topic_log',
   exchange_type='topic',
   )
while True:
 route_key=input('Input routing key:')
 msg=''.join(sys.argv[1:]) or 'Hello'
 channel.basic_publish(
 exchange='topic_log',
 routing_key=route_key,
 body=msg,
 properties=pika.BasicProperties(
  delivery_mode=2
 )
 )
connection.close()

consumer:

#Author :ywq
import pika,sys
auth_info=pika.PlainCredentials('ywq','qwe')
connection=pika.BlockingConnection(pika.ConnectionParameters(
 '192.168.0.158',5672,'/',auth_info
))
channel=connection.channel()
channel.exchange_declare(
 exchange='topic_log',
 exchange_type='topic'
)
queue_num=channel.queue_declare(exclusive=True)
queue_name=queue_num.method.queue
route_key=input('Input routing key:')
channel.queue_bind(
 queue=queue_name,
 exchange='topic_log',
 routing_key=route_key
)
def callback(chann,deliver,property,body):
 print('Recv:[type:%s],[msg:%s]' %(route_key,body))
 chann.basic_ack(delivery_tag=deliver.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(
 callback,
 queue=queue_name
)
channel.start_consuming()

実行結果:

rabbitmq 三つの publish/subscribe モデルの簡単な紹介が完了しました。

以上のpythonのキュー通信:rabbitMQの使用(実例説明)が編集者が皆さんに提供する全ての内容です。皆さんに参考になれば幸いですし、もっとサポートしていただければと思います。

声明:本記事の内容はインターネットから取得しており、著作権者はすべて所有しています。インターネットユーザーにより自発的に提供され、アップロードされたコンテンツであり、本サイトは所有権を持ちません。また、人工編集は行われておらず、関連する法的責任を負いません。著作権侵害が疑われる内容を見つけた場合は、以下のメールアドレスにご連絡ください:notice#oldtoolbag.com(メールを送信する際には、#を@に置き換えてください。報告を行い、関連する証拠を提供してください。一旦確認が取れた場合、本サイトは直ちに侵害が疑われる内容を削除します。)

基礎教程
おすすめ