pythonのasyncioでタスク分散に自前のキューを使ったメモ

Pythonasyncioでタスク分散を行った。自前のキューと処理時間、PriorityQueueについての記録を残しておく。

asyncio

asyncioとはPythonで並行処理を書くためのライブラリ。awaitasyncキーワードを使って処理の完了を待機できる。

これを元の状態を分割してキューに入れて、タスク分散して使うことがあり、最後の処理だけを元の順番通りに行いたい場合があった。

その解決方法として自前のキューを用意したので、そのキューに関してメモを残しておきたい。

絶対に順番に取り出すキュー

なるべくpriorityが小さいものを取り出したいので、asyncio.PriorityQueueを継承する。 処理済みのpriorityより大きいものを取り出してしまった場合は、キューに入れ直す。すぐに入れ直すと、再び同じものを取り出してしまう可能性が高いと思うので、0.5秒ほど待ってから入れ直している。

処理したアイテムのpriorityを保持して、キューから取り出したものと比較して返すだけのシンプルなものとなった:

class OrderedQueue(asyncio.PriorityQueue):
    """getの際、priorityが小さいものから順に返すキュー。優先度はインクリメントすること。同一優先度の2つ目以降は無視されるはず。
    (pri:int, ...)の形式を想定。priorityは0から始める。
    """

    def __init__(self, maxsize: int = 0) -> None:
        super().__init__(maxsize)
        self._last_gotten_priority = None

    async def get(self) -> Coroutine[Any, Any, Any]:
        while True:
            item = await super().get()
            pri = item[0]
            if self._last_gotten_priority is None:
                if pri == 0:
                    self._last_gotten_priority = pri
                    return item
            if self._last_gotten_priority is not None:
                if pri == self._last_gotten_priority + 1:
                    self._last_gotten_priority = pri
                    return item
            await asyncio.sleep(0.5)  # すぐの追加を避ける 
            await self.put(item)
            self.task_done()  # ループ最初にgetした分のタスクを終了したものとする

分散しない場合との比較

上のキューとは関連性が低いが、asyncioでタスク分散した場合の処理時間を比較しておきたい。

最終処理を順番に行う都合上、分散した場合の利点は最終処理より前のタスクに限られる。そのため、処理全体の時間と1つ目の最終処理直前に到達した時間を計測してみた。サンプル不足は否めない。

結果は下のようになった:

分散しない場合 時間(秒)
proc time 127.45
2回目 118.57
最終処理直前の時間 15.57
分散した場合 時間(秒)
proc time 109.14
2回目 110.06
最初のアイテムの最終処理直前の時間 2.71

いくつに分散させるかで変わるが、今回は4つで分散している。一塊で処理する場合よりも分散したほうが、最終処理直前の時間がとても速い。12秒ほど速くなっている。

そもそもの最終処理が時間がかかるものなので、全体としては速く最終処理に取り掛かれた分だけ速くなっていると思われる。

PriorityQueue

asycioにはPriorityQueueというものが既に実装されているが、これはキューを取り出すときにキュー内のアイテムの最も大小関係で小さいものが取り出されるというもの。実装を見るとわかるが、heapq.heappushをputの際に行っている。

これはキュー内の話なので、まだ追加されていない全体を知りようがないので順番には取り出すことができない。そこで今回のOrderedQueueを実装した。

最終処理以外のタスクはこのPriorityQueueを使っている。

おわり

実装したOrderedQueueはもはやキューとは呼べないもののような気もするが、asyncioでPriorityQueueとの協調を行う都合上、恐らくこの形がやりやすいかなと思う。

分散する数の最適なものも探すべきなのかもしれない。

asyncio自体は積極的に使いたい。aiohttpというライブラリもなかなか良いものだ。

以上です。

タイトルとURLをコピーしました