English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية
(一)、前語
メッセージキューを導入する理由は何ですか?
1.プログラムの解耦
2.パフォーマンスの向上
3.多業務ロジックの複雑さを低減
(二)、pythonでrabbit mqの操作
rabbitmqの設定、インストール、基本的な使用法は前節の記事を参照してください。再述しません。
pythonでrabbitmqを使用するには、pikaモジュールをインストールする必要があります。直接pipでインストールしてください:
pip install pika
1.最も簡単なrabbitmq プロダクタとコンサーマーの対話:
producer:
#Author :ywq import pika auth = pika.PlainCredentials('ywq', 'qwe') #save auth info connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #连接到Rabbit channel = connection.channel() #create channel channel.queue_declare(queue='hello') #声明队列 #n RabbitMQ 消息不能直接发送到队列,它总是需要通过交换机。 channel.basic_publish(exchange='', routing_key='hello', body='Hello World!') #消息内容是消息体 print(" [x] 发送 'Hello World!'") connection.close()
consumer:
#Author :ywq import pika connection = pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth)) #连接到Rabbit channel = connection.channel() #创建通道 channel.queue_declare(queue='hello') #声明队列 def callback(ch, method, properties, body): print(" [x] 收到 %r" % body) channel.basic_consume(callback, queue='hello', no_ack=True) print(' [*] 等待消息。要退出请按CTRL+C') channel.start_consuming()
在消息传递消费过程中,可以在Rabbit Web管理页面实时查看队列消息信息。
2.持久化的消息队列,避免因宕机等意外情况导致消息队列丢失。
消费者端无需更改,在生产者端代码中添加两个属性,分别使消息持久化、队列持久化,只选择其一仍会出现消息丢失,必须同时开启:
delivery_mode=2 # make msg persistent durable=True
属性插入位置见如下代码(生产者端):
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.queue_declare(queue='test1#durable=True, 持久化队列 msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='', routing_key='test1', body=msg, properties=pika.BasicProperties( delivery_mode=2 # make msg persistent ) ) print('Send done:',msg) connection.close()
3.公平配信
複数のconsumerがある場合、デフォルトでRabbitMQはメッセージをループで送信しますが、あるconsumerは速く、あるconsumerは遅く、リソースの使用がよりバランスが取れるように、ack確認メカニズムを導入します。consumerがメッセージを消費した後、RabbitMQにackを送信します。未ackのメッセージの数が指定された許容数を超えた場合、そのconsumerにメッセージを送信するのを停止し、他のconsumerに送信します。
producer端のコードは変更しないでください。consumer端のコードに2つの属性を挿入する必要があります:
channel.basic_qos(prefetch_count= *) # define the max non_ack_count channel.basic_ack(delivery_tag=deliver.delivery_tag) # send ack to rabbitmq
属性の挿入位置は以下のコード(consumer端)で確認してください:
#Author :ywq import pika,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.queue_declare(queue='test2',durable=True) def callback(chann,deliver,properties,body): print('Recv:',body) time.sleep(5) chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit channel.basic_qos(prefetch_count=1) ''' 注意、no_ack=False 注意、ここでのno_ackの種類は、RabbitMQにこのconsumerのqueueがackを返すかどうかを伝えるためのものであり、ackを返すにはcallback内で定義する必要があります。 prefetch_count=1,未ackのmsgの数が1個の場合、このconsumerはmsgを受け取らなくなり、この設定はchannel.basic_consumeの上に書かれる必要があります。さもなければnon_ack状況が発生します。 ''' channel.basic_consume( callback, queue='test2' ) channel.start_consuming()
第3章 メッセージの配信/購読
上記のパターンは、producerが一度送信するとconsumerが一度受け取る構造ですが、producerが一度送信して、複数の関連するconsumerが同時に受け取るようにできますか?もちろんです、RabbitMQはメッセージの配信と購読をサポートしており、3つのモードをサポートしています。これらはコンポーネントexchange経由で転送されます。3以下の種類のパターン:
fanout: このexchangeにバインドされたすべてのqueueがメッセージを受け取れる、ブロードキャストに似ています。
direct: メッセージを受け取れるユニークなqueueはroutingKeyとexchangeによって決定され、そのqueueにバインドされたconsumerにメッセージを送信し、マルチキャストに似ています。
topic:所有符合routingKey(此时可以是一个表达式)的routingKey所bind的queue可以接收消息,类似前缀列表匹配路由。
1.fanout
publish端(producer):
#Author :ywq import pika,sys,time auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='hello', exchange_type='fanout' ) msg=''.join(sys.argv[1:]) or 'Hello world %s' %time.time() channel.basic_publish( exchange='hello', routing_key='', body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) print('send done') connection.close()
subscribe端(consumer):
#Author :ywq import pika auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare( exchange='hello', exchange_type='fanout' ) random_num=channel.queue_declare(exclusive=True) #随机与rabbit建立一个queue,comsumer断开后,该queue立即删除释放 queue_name=random_num.method.queue channel.basic_qos(prefetch_count=1) channel.queue_bind( queue=queue_name, exchange='hello' ) def callback(chann,deliver,properties,body): print('Recv:',body) chann.basic_ack(delivery_tag=deliver.delivery_tag) #send ack to rabbit channel.basic_consume( callback, queue=queue_name, ) channel.start_consuming()
实现producer一次发送,多个关联consumer接收。
使用exchange模式时:
1.producer端不再申明queue,直接申明exchange
2.consumer端仍需绑定队列并指定exchange来接收message
3.consumer最好创建随机queue,使用完后立即释放。
随机队列名在web下可以检测到:
2.direct
使用exchange同时consumer有选择性地接收消息。队列绑定关键字,producer将数据根据关键字发送到消息exchange,exchange根据 关键字 判定应该将数据发送至指定队列,consumer相应接收。即在fanout基础上增加了routing key。
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='direct_log', exchange_type='direct', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='direct_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='direct_log', exchange_type='direct', ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='direct_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[level:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
複数のconsumerを同時に開始し、そのうち2つがnoticeを受け取り、2つがwarningを受け取り、以下のようになります:
3.topic
directと比較して、topicは(consumer側で指定する)模糊一致の動作方式を実現できます。routing keyに指定されたキーワードが含まれていれば、そのmsgをバインドされたqueueに送信します。
rabbitmqのワイルドカードルール:
シンボル「#」は1つ以上の単語を一致させます、シンボル「」は1つの単語を一致させます。したがって「abc.#」は「abc.m.n」に一致しますが、「abc.*‘' だけが「abc.m」に一致します。‘.' は区切り記号です。ワイルドカードで一致させる場合、‘.' を区切り記号に使用する必要があります。
producer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info ) ) channel=connection.channel() channel.exchange_declare(exchange='topic_log', exchange_type='topic', ) while True: route_key=input('Input routing key:') msg=''.join(sys.argv[1:]) or 'Hello' channel.basic_publish( exchange='topic_log', routing_key=route_key, body=msg, properties=pika.BasicProperties( delivery_mode=2 ) ) connection.close()
consumer:
#Author :ywq import pika,sys auth_info=pika.PlainCredentials('ywq','qwe') connection=pika.BlockingConnection(pika.ConnectionParameters( '192.168.0.158',5672,'/',auth_info )) channel=connection.channel() channel.exchange_declare( exchange='topic_log', exchange_type='topic' ) queue_num=channel.queue_declare(exclusive=True) queue_name=queue_num.method.queue route_key=input('Input routing key:') channel.queue_bind( queue=queue_name, exchange='topic_log', routing_key=route_key ) def callback(chann,deliver,property,body): print('Recv:[type:%s],[msg:%s]' %(route_key,body)) chann.basic_ack(delivery_tag=deliver.delivery_tag) channel.basic_qos(prefetch_count=1) channel.basic_consume( callback, queue=queue_name ) channel.start_consuming()
実行結果:
rabbitmq 三つの publish/subscribe モデルの簡単な紹介が完了しました。
以上のpythonのキュー通信:rabbitMQの使用(実例説明)が編集者が皆さんに提供する全ての内容です。皆さんに参考になれば幸いですし、もっとサポートしていただければと思います。
声明:本記事の内容はインターネットから取得しており、著作権者はすべて所有しています。インターネットユーザーにより自発的に提供され、アップロードされたコンテンツであり、本サイトは所有権を持ちません。また、人工編集は行われておらず、関連する法的責任を負いません。著作権侵害が疑われる内容を見つけた場合は、以下のメールアドレスにご連絡ください:notice#oldtoolbag.com(メールを送信する際には、#を@に置き換えてください。報告を行い、関連する証拠を提供してください。一旦確認が取れた場合、本サイトは直ちに侵害が疑われる内容を削除します。)