English | 简体中文 | 繁體中文 | Русский язык | Français | Español | Português | Deutsch | 日本語 | 한국어 | Italiano | بالعربية

python 並行2asyncio を使って並行処理を行う

asyncio

Python 2の時代、高性能なネットワークプログラミングはTwisted、Tornado、Geventのこれら3つのライブラリを使用していましたが、そのアシストロニックコードは互換性がなく、移植性もありませんでした。前節で述べたように、GvanrossumはPython 3 ネイティブな生成器ベースのコルーチンライブラリを実装し、アシストロニックIOのサポートが直接内蔵されています。これがasyncioで、Python 3.4標準ライブラリに導入されました。

asyncio このパッケージはイベントループ駆動のコルーチンで並行処理を実現しています。

asyncioパッケージは標準ライブラリに導入される前に「Tulip」(郁金香)というコードネームが付けられていたため、オンラインで情報を検索すると、この花の名前がよく見られます。

イベントループとは何ですか?63;

wikiには「イベントループは、プログラムがイベントやメッセージを待つためのプログラミングアーキテクチャ」とあります。基本的には、イベントループは「Aが発生したときにBを実行する」ということです。または、最もシンプルな例として、ブラウザに存在するJavaScriptのイベントループを説明することができます。何かをクリックすると(「Aが発生したときに」)、そのクリックアクションがJavaScriptのイベントループに送られ、onclickコールバックが登録されているかどうかを確認し、クリックアクションの詳細情報とともにコールバックが実行されます。登録されているコールバック関数があれば、クリックアクションの詳細情報とともに実行されます。イベントループは、常にイベントを待ち受け、それらのイベントをループを通じて送信し、それらのイベントに対処するための方法として虚幻と考えられています。

Pythonでは、標準ライブラリに加わる前に「Tulip」(郁金香)というコードネームが付けられていたasyncioパッケージが使用されます。asyncioはネットワークサービスの問題を解決するために重点を置いており、ここでのイベントループはソケット(socket)からのI/Oイベントから来ています。/Oが読み込まれる準備ができたら/または'selectorsモジュール'を通じて「Aが発生したときに」のように書ける。/イベントループは、他のスレッドや子プロセスでコードを実行し、イベントループを調節機制(例えば、協力的マルチタスク)として使用されることがよくあります。PythonのGILを理解している場合、GILを解放する必要がある場所でイベントループは非常に有用です。

スレッドとコルーチン

まずはthreadingモジュールとasyncioパッケージで実現された2つのコードを確認します。

# sinner_thread.py
import threading
import itertools
import time
import sys
class Signal: # このクラスは外部からスレッドを制御するための可変オブジェクトを定義します
 go = True
def spin(msg, signal): # この関数は別のスレッドで実行されます、signalパラメータは先に定義されたSignalクラスのインスタンスです
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|')/-\\'): # itertools.cycle 関数は指定されたシーケンスから元素を繰り返し生成します
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x0)8' * len(status)) # 使用退格符把光标移回行首
  time.sleep(.1) # 0.1 秒に一度リフレッシュ
  if not signal.go: # go属性がTrueでない場合、ループから退出
   break
 write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除狀態消息,把光標移回開頭
def slow_function(): # 耗時操作のシミュレーション
 # 假裝等待I/O一個時段
 time.sleep(3) # sleepを呼び出すとメインスレッドがブロックされ、これによりGILを解放し、従属スレッドを作成するためです
 return 42
def supervisor(): # この関数は従属スレッドを設定し、スレッドオブジェクトを表示し、実行時間を計算し、最後にプロセスを終了します
 signal = Signal()
 spinner = threading.Thread(target=spin,
        args=('thinking!', signal))
 print('spinner object:', spinner) # スレッドオブジェクトを表示し、出力 spinner object: <Thread(Thread-1, initial)>
 spinner.start() # 次要プロセスを開始します
 result = slow_function() # slow_function の行を実行し、メインスレッドをブロック。同時にスレッドをアニメーション形式で回転させます
 signal.go = False
 spinner.join() # spinner スレッドが終わるまで待機します
 return result
def main():
 result = supervisor() 
 print('Answer', result)
if __name__ == '__main__':
 main()

実行してみると、結果はこんな感じになります:

これはアニメーションです、「thinking」の前に線が動いています(スクリーンキャプチャのために sleep の時間を長くしました)

python はスレッドを終了するAPIを提供していないため、スレッドを閉じるにはスレッドにメッセージを送信する必要があります。ここでは signal.go 属性を使用します:メインスレッドでそれを False に設定すると、spinner スレッドがそれを受け取り、終了します

今度は asyncio パッケージを使用するバージョンを見てみましょう:

# spinner_asyncio.py
# コルーチンを通じてアニメーション形式でテキストのローテーション指針を表示します
import asyncio
import itertools
import sys
@asyncio.coroutine # asyncio処理にまかせるコルーチンには @asyncio.coroutine デコレータを使用します
def spin(msg):
 write, flush = sys.stdout.write, sys.stdout.flush
 for char in itertools.cycle('|')/-\\'): # itertools.cycle 関数は指定されたシーケンスから元素を繰り返し生成します
  status = char + ' ' + msg
  write(status)
  flush()
  write('\x0)8' * len(status)) # 使用退格符把光标移回行首
  try:
   yield from asyncio.sleep(0.)1) # 使用 yield from asyncio.sleep(0.)1) 代替 time.sleep(.)1)、この休眠はイベントループをブロックしません
  except asyncio.CancelledError: # 如果 spin 函數醒後拋出 asyncio.CancelledError 異常,其原因是發出了取消請求
   break
 write(' ' * len(status) + '\x08' * len(status)) # 使用空格清除狀態消息,把光標移回開頭
@asyncio.coroutine
def slow_function(): # 5 現在此函數是協程,使用休眠假裝進行I/O 操作時,使用 yield from 繼續執行事件循環
 # 假裝等待I/O一個時段
 yield from asyncio.sleep(3) # 此表達式把控制權交給主循環,在休眠結束後回复這個協程
 return 42
@asyncio.coroutine
def supervisor(): #這個函數也是協程,因此可以使用 yield from 駅動 slow_function
 spinner = asyncio.async(spin('thinking!')) # asyncio.async() 函數排定協程的運行時間,使用一個 Task 對象包裝spin 協程,並立即返回
 print('spinner object:', spinner) # Task 對象,輸出類似 spinner object: <Task pending coro=<spin() running at spinner_asyncio.py:6>>
 # 驅動slow_function() 函數,結束後,獲取返回值。同時事件循環繼續運行,
 # 因為slow_function 函數最後使用yield from asyncio.sleep(3) 表達式把控制權交給主循環
 result = yield from slow_function()
 # Task 對象可以取消;取消後會在協程當前暫停的yield處拋出 asyncio.CancelledError 異常
 # 協程可以捕獲這個異常,也可以延遲取消,甚至拒絕取消
 spinner.cancel()
 return result
def main():
 loop = asyncio.get_event_loop() # 获取事件循环引用
 # 驱動supervisor 協程,讓它運行完畢;這個協程的返回值是這次調用的返回值
 result = loop.run_until_complete(supervisor())
 loop.close()
 print('Answer', result)
if __name__ == '__main__':
 main()

メインスレッドをブロックしてイベントループや全体のアプリケーションを凍結することを意図していない限り、asyncioコルーチン内でtime.sleep()を使用しないでください。

コルーチンが一定の期間何もしない場合、yield from asyncio.sleep(DELAY)を使用する必要があります。

asyncio.coroutineデコレータを使用することは強制ではありませんが、コードでコルーチンを明確に示すために推奨されます。なぜなら、コルーチンが値を出力していない場合、コルーチンはガベージコレクションを行います(これは操作が完了していないため、欠陥がある可能性があることを意味します)し、警告を出すことができます。このデコレータはコルーチンを予測的に実行しません。

これら2つのコードの実行結果は基本的に同じですが、現在はこれら2つのコードの核心コードsupervisorの主な違いを見てみましょう:

  1. asyncio.Taskオブジェクトはthreading.Threadオブジェクトとほぼ同等です(Taskオブジェクトは実行時多任務のライブラリの緑色スレッドのようなものです)。
  2. Taskオブジェクトはコルーチンを駆動し、Threadオブジェクトは呼び出し可能なオブジェクトを呼び出すために使用されます。
  3. Taskオブジェクトは自分でインスタンス化することはできず、コルーチンをasyncio.async(...)関数やloop.create_task(...)メソッドに渡すことで取得します。
  4. 取得したTaskオブジェクトは既に実行時間が決まっています;Threadインスタンスはstartメソッドを呼び出して、実行を明示的に知らせなければなりません。
  5. スレッド版のsupervisor関数では、slow_functionは通常の関数であり、スレッドが直接呼び出しますが、アシスト版のslow_function関数はコルーチンであり、yield fromで動作します。
  6. 外部からスレッドを終了するAPIはありません。なぜなら、スレッドはいつでも中断される可能性があるからです。タスクを終了するには、Task.cancel()インスタンスメソッドを使用し、コルーチン内でCancelledError例外を投げることができます。コルーチンは、暂停したyieldでこの例外をキャッチして、終了要求を処理できます。
  7. supervisorコルーチンは、main関数でloop.run_until_completeメソッドを実行する必要があります。

コルーチンとスレッドと比べて重要な利点の一つは、スレッドはロックを保持してプログラムの重要な部分を保護し、多段操作の実行中に中断を防ぎ、山水が於曉状態になるのを防ぐために、コルーチンはデフォルトで保護を行います。私たちは明示的にyieldまたはyield fromを使用してコントロール権を放棄しなければ、プログラムの残りの部分が実行されません。

asyncio.Future:意図的にブロックしない

asynci.Futureクラスとconcurrent.futures.Futureクラスのインターフェースは基本的に同じですが、実現方法が異なり、交換できません。

前回[pythonの並行 1:futuresを使用して並行処理する()前に、concurrent.futures.Futureのfutureについて紹介しました。concurrent.futures.Futureでは、futureは単に某物の実行結果をスケジュールするものであり、asyncioパッケージでは、BaseEventLoop.create_task(...)メソッドがコルーチンを受け取り、その実行時間を設定し、asyncio.Taskインスタンス(asyncio.Futureクラスのインスタンス、TaskはFutureのサブクラスであり、コルーチンを包装するために使用されます。(concurrent.futures.Futureでは、同様の操作はExecutor.submit(...)です)。

concurrent.futures.Futureクラスと同様に、asyncio.Futureクラスも以下を提供しています:

  1. .done()は布尔値を返し、Futureが既に実行されているかどうかを示します。
  2. .add_done_callback()メソッドには1つのパラメータがあり、その型は呼び出し可能オブジェクトです。Futureが実行終了した後にこのオブジェクトにコールバックが行われます。
  3. .result()メソッドにはパラメータがなく、したがってタイムアウト時間を指定できません。.result()メソッドを呼び出し時がまだ実行されていない場合、asyncio.InvalidStateError例外が投げられます。

対応するconcurrent.futures.FutureクラスのFutureが実行終了した後にresult()を呼び出すと、呼び出し可能オブジェクトの結果または実行中に発生した例外が返されます。Futureが実行終了していなかった場合にf.result()メソッドを呼び出すと、呼び出し元のスレッドがブロックされ、結果が返されるまで待機します。この時、resultメソッドはtimeoutパラメータを受け取ることができ、指定された時間内にFutureが実行終了しなかった場合、TimeoutError例外が投げられます。

asyncio.Futureを使用する際には、通常yield fromを使用して結果を取得し、result()メソッドを使用しません。yield from表現式は、暂停中のコルーチンで返値を生成し、実行プロセスに戻ります。

asyncio.Futureクラスの目的はyield fromと一緒に使用されるため、以下のメソッドは通常使用されません:

  1. my_future.add_down_callback(...),を呼び出す必要はありません。なぜなら、futureの実行終了後に行いたい操作をコルーチンに直接配置して、yield from my_future表現の後に置くことができます。(なぜならコルーチンは関数を一時停止および再開できるからです)
  2. my_future.result()を呼び出す必要はありません。なぜなら、yield fromで生成された結果が(result = yield from my_future)であるからです。

asyncioパッケージでは、yield fromを使用してasyncio.Futureオブジェクトから結果を出力できます。これは、以下のように書けることを意味します:

res = yield from foo() # fooはコルーチン関数またはFutureまたはtaskインスタンスを返す通常の関数でできます

asyncio.async(...)* 関数

asyncio.async(coro_or_future, *, loop=None)

この関数はコルーチンとFutureを統一しています:第一引数はどちらでも指定できます。FutureまたはTaskオブジェクトの場合はそのまま返しますが、コルーチンの場合はasync関数が自動的にloop.create_task(...)メソッドを呼び出してTaskオブジェクトを作成します。loop引数はオプションで、イベントループを渡します;指定しない場合、async関数はasyncio.get_event_loop()関数を呼び出してループオブジェクトを取得します。

BaseEventLoop.create_task(coro)

このメソッドはコルーチンの実行時間を決定し、asyncio.Taskオブジェクトを返します。BaseEventLoopのサブクラスで呼び出された場合、返されるオブジェクトはTaskクラスと互換性のある外部ライブラリのクラスのインスタンスである可能性があります。

BaseEventLoop.create_task()メソッドはPython3.4.2 以降のバージョンで使用可能です。Python3.3 asyncio.async(...)関数のみを使用できます。
Pythonのコンソールまたは小規模なテストスクリプトでfutureおよびコルーチンを試験するために、以下のスニペットを使用できます:

import asyncio
def run_sync(coro_or_future):
 loop = asyncio.get_event_loop()
 return loop.run_until_complete(coro_or_future)
a = run_sync(some_coroutine())

asyncioとaiohttpパッケージを使用してダウンロード

asyncioの基本的な知識を理解した今、前回の[python并发]の重写にasyncioを使用する時が来ました 1:futuresを使用して並列に処理するために...] 国旗のダウンロードスクリプトを作成しました。

まず、以下のコードを見てみましょう:

import asyncio
import aiohttp # aiohttpをインストールする必要があります(pip install aiohttp)
from flags import save_flag, show, main, BASE_URL
@asyncio.coroutine # 協程はasyncio.coroutineデコレータを使用するべきであることを知っています
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
  # ブロッキング操作はコルーチンで実現され、クライアントコードはyield fromを使用して責任をコルーチンに委譲し、非同期操作を実行します
 resp = yield from aiohttp.request('GET', url) 
 # 読み取りも非同期操作です
 image = yield from resp.read()
 return image
@asyncio.coroutine
def download_one(cc): # このファンクションもコルーチンでなければなりません。なぜなら、yield fromを使用しているからです
 image = yield from get_flag(cc) 
 show(cc)
 save_flag(image, cc.lower()) + '.gif')}
 return cc
def download_many(cc_list):
 loop = asyncio.get_event_loop() # イベントループの下層実装の参照を取得します
 to_do = [download_one(cc) for cc in sorted(cc_list)] # 各国旗を取得するためにdownload_oneを呼び出し、生成器オブジェクトのリストを構築します
 # ファンクション名がwaitだが、ブロッキング型のファンクションではありません。waitはコルーチンであり、渡されたすべてのコルーチンが終了した後に終了します
 wait_coro = asyncio.wait(to_do)
 res, _ = loop.run_until_complete(wait_coro) # イベントループを実行し、wait_coroが終了するまで待ちます;イベントループの実行中に、このスクリプトはここでブロッキングします。
 loop.close() # イベントループを閉じます
 return len(res)
if __name__ == '__main__':
 main(download_many)

このコードの動作の簡単な説明は以下の通りです:

  1. download_many ファンクションは、download_one ファンクションが生成するいくつかのコルーチンオブジェクトを処理するために一つのイベントループを取得します
  2. asyncio イベントループは一度に各コルーチンを活性化します
  3. クライアントコードのコルーチン(get_flag)は、ライブラリ内のコルーチン(aiohttp.request)に責任を委譲するために yield from を使用し、コルーチンが実行される前にイベントループに制御権を返します
  4. イベントループは、ブロッキング操作が完了した後に通知を得るために、ベースの回调APIを使用します。
  5. 通知を受け取った後、メインループは結果を一時停止しているコルーチンに送信します。
  6. コルーチンは次のyield from表現に進行し、例えばget_flag関数のyield from resp.read()。イベントループが再び制御権を得て、ステップ2に戻ります。4~6ステップを進め、ループが終了するまで。

download_many関数では、asyncio.wait(...)関数を使用しており、この関数はコルーチンであり、コルーチンのパラメータはfutureまたはコルーチンで構成された可迭代オブジェクトです;waitはそれぞれのコルーチンをTaskオブジェクトに包装します。最終的な結果は、waitが処理するすべてのオブジェクトがFutureクラスのインスタンスに変換されます。

waitはコルーチン関数であり、そのため、コルーチンまたは生成器オブジェクトを返します;waite_coro変数にはこの種のオブジェクトが格納されています。

waitはコルーチン関数であり、そのため、コルーチンまたは生成器オブジェクトを返します;waite_coro変数にはこの種のオブジェクトが格納されています。loop.run_until_completeメソッドのパラメータはfutureまたはコルーチンです。コルーチンの場合、run_until_completeメソッドはwait関数と同様に、コルーチンをTaskオブジェクトに包装します。ここでrun_until_completeメソッドはwait_coroをTaskオブジェクトに包装し、yield fromで動作します。wait_coroが実行終了すると、2つのパラメータが返されます。第1パラメータは終了したfuture、第2パラメータは未完了のfutureです。

<section class="caption">wait</section>には、timeoutとreturn_whenという2つの名前付きパラメータがあります。これらが設定されている場合、未完了のfutureが返される可能性があります。

おそらく気づいたかもしれませんが、get_flags 関数を再定義したのは、以前に使用していたrequestsライブラリがブロッキング型I/O操作。asyncioパッケージを使用するには、関数を非同期版に変更する必要があります。

小技

コルーチンを使った後、コードが理解しにくいと感じる場合は、Pythonの父(Guido van Rossum)の提案に従って、yield from を存在しないと仮定することができます。

以下のこのコードを例にして:

@asyncio.coroutine
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url) 
 image = yield from resp.read()
 return image
# yield form を取り除く
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = aiohttp.request('GET', url) 
 image = resp.read()
 return image
# より明確になりましたか

知識点

asyncio パッケージのAPIで yield from を使用する際には、注意すべき細かい点があります:

asyncio パッケージを使用する際には、asyncio自体が駆動するコルーチン(委譲生成器)が含まれるアスペクトコードを書きます。生成器は最終的にasyncioパッケージまたはサードパーティライブラリのコルーチンに責任を委譲します。この処理方法はパイプラインを構築し、asyncioイベントループが下層のアスペクトI/Oを駆動するようにします。/Oのライブラリ関数。

ブロッキングコールを避けます。

まず、この図を見てください。この図はコンピュータが異なるストレージメディアからデータを読み取る遅延を示しています:

この図を通じて、ブロッキングコールがCPUに対してどれほどの無駄であるかを確認できます。ブロッキングコールがアプリケーションの全体をブロックするのを避ける方法は何でしょうか?

方法は二つあります:

  1. 各ブロッキング操作を独立したスレッドで実行します。
  2. 各ブロッキング操作を非ブロッキングのアスペクトコールに変換します。

もちろん、私たちは第二の方法を推奨します。なぜなら、第一の方法では各接続にスレッドを使用するコストが非常に高いからです。

第二に、生成器をコルーチンとして使用する方法でアスペクトプログラミングを実現できます。イベントループにとっては、コールバックを呼び出すと、一時停止したコルーチンに .send() メソッドを呼び出すのと同じです。一時停止したコルーチンが消費するメモリはスレッドよりもずっと少ないです。

今では、flags_asyncio.py スクリプトが flags.py よりもはるかに速い理由が理解できるでしょう。

因為 flags.py はシーケンシャルに同期ダウンロードを行い、各回のダウンロードには数十億のCPUサイクルを待つ必要があります。一方、flags_asyncio.py では、download_many 関数で loop.run_until_complete メソッドを呼び出す際に、イベントループが各 download_one コルーチンを駆動し、yield from 表現に出て、その表現が各 get_flag コルーチンを駆動し、最初のyield from 表現に到達し、aiohttp.request() 関数を呼び出します。これらの呼び出しはブロッキングされず、したがって数秒以内にすべてのリクエストが同時に開始できます。

asyncio ダウンロードスクリプトを改善します。

現在、上記の flags_asyncio.py を改善し、例外処理とカウンタを追加します。

import asyncio
import collections
from collections import namedtuple
from enum import Enum
import aiohttp
from aiohttp import web
from flags import save_flag, show, main, BASE_URL
DEFAULT_CONCUR_REQ = 5
MAX_CONCUR_REQ = 1000
Result = namedtuple('Result', 'status data')
HTTPStatus = Enum('Status', 'ok not_found error')
# 自定义异常用于包装其他HTTP货网络异常,并获取country_code,以便报告错误
class FetchError(Exception):
 def __init__(self, country_code):
  self.country_code = country_code
@asyncio.coroutine
def get_flag(cc):
 # 此协程有三种返回结果:
 # 1. 返回下载到的图片
 # 2. HTTP 响应为404 时,抛出web.HTTPNotFound 异常
 # 3. 返回其他HTTP状态码时, 抛出aiohttp.HttpProcessingError
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  image = yield from resp.read()
  return image
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers
  )
@asyncio.coroutine
def download_one(cc, semaphore):
 # semaphore 参数是 asyncio.Semaphore 类的实例
 # Semaphore 类是同步装置,用于限制并发请求
 try:
  with (yield from semaphore):
    # 在yield from 表达式中把semaphore 当成上下文管理器使用,防止阻塞整个系统
    # 如果semaphore 计数器的值是所允许的最大值,只有这个协程会阻塞
    image = yield from get_flag(cc)
    # 退出with语句后 semaphore 计数器的值会递减,
    # 解除阻塞可能在等待同一个semaphore对象的其他协程实例
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  save_flag(image, cc.lower()) + '.gif')}
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)
@asyncio.coroutine
def downloader_coro(cc_list):
 counter = collections.Counter()
 # 创建一个 asyncio.Semaphore 实例,最多允许激活MAX_CONCUR_REQ个使用这个计数器的协程
 semaphore = asyncio.Semaphore(MAX_CONCUR_REQ)
 # 多次调用 download_one 协程,创建一个协程对象列表
 to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
 # futureが実行終了した後にfutureを返すイテレータを取得
 to_do_iter = asyncio.as_completed(to_do)
 for future in to_do_iter:
  # 実行終了が許されるfutureを反復 
  try:
   res = yield from future # asyncio.Future オブジェクトの結果を取得(future.result でも呼び出せます)
  except FetchError as exc:
   # 抛出された例外はすべてFetchError オブジェクトに包装されています
   country_code = exc.country_code
   try:
    # 元の例外(__cause__)からエラーメッセージを試みる
    error_msg = exc.__cause__.args[0]
   except IndexError:
    # もし元の例外でエラーメッセージが見つからない場合、接続した例外のクラス名を使用する
    error_msg = exc.__cause__.__class__.__name__
   if error_msg:
    msg = '*** Error for {}: {}'
    print(msg.format(country_code, error_msg))
   status = HTTPStatus.error
  else:
   status = res.status
  counter[status] += 1
 return counter
def download_many(cc_list):
 loop = asyncio.get_event_loop()
 coro = downloader_coro(cc_list)
 counts = loop.run_until_complete(coro)
 loop.close()
 return counts
if __name__ == '__main__':
 main(download_many)

コルーチンが発行するリクエストの速度が速いため、サーバーに多くの並行リクエストを送信してサーバーが過負荷になるのを防ぐために、download_coro 関数でasyncio.Semaphore インスタンスを作成し、それをdownload_one 関数に渡しています。

<secion class="caption">Semaphore</section>オブジェクトは内部カウンタを保持しており、.acquire()コルートインメソッドが呼び出された場合、カウンタは減少します;.release()コルートインメソッドが呼び出された場合、カウンタは増加します。カウンタの値は初期設定時に設定されます。

カウンタが0より大きい場合、.acquire()メソッドの呼び出しはブロックされず、カウンタが0の場合、.acquire()メソッドの呼び出しはこのメソッドを呼び出すコルートインをブロックし、他のコルートインが同じSemaphoreオブジェクト上で.release()メソッドを呼び出し、カウンタを増加させるまで待機します。

上記のコードでは、.acquire()や.release()メソッドを手動で呼び出すのではなく、download_one関数でsemaphoreをコンテキストマネージャとして使用しています:

with (yield from semaphore):
 image = yield from get_flag(cc)

このコードは、どんな時でもMAX_CONCUR_REQ個以上のget_flagコルートインがスタートしないことを保証します。

asyncio.as_completed関数を使用

yield fromを使用してasyncio.as_completed関数からfutureの結果を取得する必要があるため、as_completed関数はコルートインで呼び出される必要があります。download_many関数は非コルートインのmain関数に引数として渡されるため、download_many関数のみをイベントループの設定に使用する新しいdownloader_coroコルートインを追加しました。

Executorオブジェクトを使用して、イベントループをブロックすることを防ぎます

ここで、上記の異なるストレージメディアからのデータ読取遅延グラフに戻ってみると、特に注意すべきことはローカルファイルシステムへのアクセスもブロックされることです。

上記のコードでは、save_flag関数がクライアントコードとasyncioイベントループが共有する唯一のスレッドをブロックしているため、ファイルを保存する際には全体のアプリケーションが停止します。この問題を避けるために、イベントループオブジェクトのrun_in_executorメソッドを使用することができます。

asyncioのイベントループはバックグラウンドでThreadPoolExecutorオブジェクトを維持しており、run_in_executorメソッドを呼び出して実行可能なオブジェクトを渡すことができます。

以下に変更後のコードを示します:

@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  # ここは変更部分
  loop = asyncio.get_event_loop() # 事件ループの参照を取得
  loop.run_in_executor(None, save_flag, image, cc.lower()) + '.gif')}
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

run_in_executorメソッドの最初の引数はExecutorインスタンスです。Noneに設定されている場合、イベントループのデフォルトのThreadPoolExecutorインスタンスを使用します。

コールバックからfutureへ、そしてコルoutinesへ

コルoutinesに触れる前に、コールバックについてある程度理解しているかもしれません。それでは、コールバックと比較して、コルoutinesはどのような改善を行っていますか?

Pythonでのコールバックコードのスタイル:

def stage1(response1)
 request2 = step1(response1)
 api_call2(request2, stage2)
def stage2(response2)
 request3 = step3(response3)
 api_call3(request3, stage3) 
 def stage3(response3)
  step3(response3) 
api_call1(request1, stage1)

上記のコードの欠点:

  1. コールバック地獄が発生しやすくなります
  2. コードが読みにくくなります

この問題において、コルoutinesは非常に役立ちます。コルoutinesとyield fromを使用した非同期コードの例を以下に示します:

@asyncio.coroutine
def three_stages(request1)
 response1 = yield from api_call1(request1)
 request2 = step1(response1)
 response2 = yield from api_call2(requests)
 request3 = step2(response2)
 response3 = yield from api_call3(requests)
 step3(response3) 
loop.create_task(three_stages(request1)

前のコードと比較して、このコードはもっと理解しやすくなりました。asyncioを呼び出すapi_call1,api_call2,api_call3 例外が発生する場合、対応するyield from表現をtry文内に置くことができます。/except文で例外を処理します。

コルoutinesを使用するには、yield from表現に慣れなければなりません。また、コルoutinesは直接呼び出すことはできず、明示的にコルoutinesの実行時間を設定するか、他のコルoutinesの実行時間を設定したコルoutines内でyield from表現を使用してそれを活性化する必要があります。loop.create_task(three_stages(request1))であれば何も起こりません。

以下に実際の例を示します:

各回のダウンロードで複数のリクエストが発行されます

上記の国旗のダウンロードコードを修正し、国旗をダウンロードする際に国名を取得して、画像を保存する際に使用します。
この問題を解決するためにコルoutinesとyield fromを使用します:

@asyncio.coroutine
def http_get(url):
 resp = yield from aiohttp.request('GET', url)
 if resp.status == 200:
  ctype = resp.headers.get('Content-type', '').lower()
  ctypeに'json'が含まれているか、urlが'json'で終わっている場合:
   data = yield from resp.json()
  else:
   data = yield from resp.read()
  return data
 elif resp.status == 404:
  raise web.HttpNotFound()
 else:
  raise aiohttp.HttpProcessionError(
   code=resp.status, message=resp.reason,
   headers=resp.headers)
@asyncio.coroutine
def get_country(cc):
 url = ""/{cc}/metadata.json".format(BASE_URL, cc=cc.lower())
 metadata = yield from http_get(url)
 return metadata['country']
@asyncio.coroutine
def get_flag(cc):
 url = ""/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
 return (yield from http_get(url))
@asyncio.coroutine
def download_one(cc, semaphore):
 try:
  with (yield from semaphore):
   image = yield from get_flag(cc)
  with (yield from semaphore):
   country = yield from get_country(cc)
 except web.HTTPNotFound:
  status = HTTPStatus.not_found
  msg = 'not found'
 except Exception as exc:
  raise FetchError(cc) from exc
 else:
  country = country.replace(' ', '_')
  filename = '{}--{}.gif'.format(country, cc)
  print(filename)
  loop = asyncio.get_event_loop()
  loop.run_in_executor(None, save_flag, image, filename)
  status = HTTPStatus.ok
  msg = 'ok'
 return Result(status, cc)

このコードでは、download_one関数内でsemaphore制御の2つのwithブロック内でget_flagとget_countryを呼び出すことで、時間を節約するためです。

get_flagのreturn文が外側の括号で囲まれているのは、()の演算子優先順位が高いため、括号内のyield from文の結果が最初に実行されるからです。()を付けないと構文エラーが発生します。

()を付けると、以下のようになります

image = yield from http_get(url)
return image

()を付けない場合、プログラムはyield fromの位置で中断し、コントロールを放棄します。この時、returnを使用すると構文エラーが発生します。

まとめ

この記事では以下のことを議論しました:

  1. マルチスレッドプログラムとasyncio版を比較し、マルチスレッドと非同期タスクの関係を説明しました
  2. asyncio.Futureクラスとconcurrent.futures.Futureクラスの違いを比較しました
  3. 非同期プログラミングを使用して、ネットワークアプリケーションでの高並行処理を管理する方法
  4. 非同期プログラミングでは、コールバックに比べて、コルーチンが性能を大幅に向上させる方法

これでこの記事はすべて終わりです。皆様の学習に役立つことを願っています。また、ナイアラガイドのサポートをどうぞよろしくお願いします。

声明:この記事の内容はインターネットから取得しており、著作権者に帰属します。インターネットユーザーが自発的に貢献し、自己でアップロードした内容であり、このサイトは所有権を持ちません。また、人工編集は行われておらず、関連する法的責任も負いません。著作権侵害を疑う内容があれば、以下のメールアドレスにご連絡ください:notice#oldtoolbag.com(メールを送信する際には、#を@に置き換えてください。通報を行い、関連する証拠を提供してください。一旦確認が取れましたら、このサイトは即座に侵害する可能性のあるコンテンツを削除します。)

おすすめ