前回はマルチスレッドの概念の簡単な説明とともに、速度の測定方法とマルチスレッドの簡単な利用方法について学びました。今回はその発展として、継承によるマルチスレッド向けのクラスの作成やロックを使ったスレッド間の同期、マルチスレッド以外の並列化手法といった内容を扱います。

継承によるマルチスレッドの実現

前回はthredingモジュールのThreadクラスのコンストラクタにマルチスレッド化したい関数とその引数を渡すという形でマルチスレッドを実現しました。

このほかにもThreadクラス自体を継承することでマルチスレッドとして動作させるクラスを作成して使うこともできます。それほど複雑ではないので、まずはコードを見てみましょう。

import threading, time

# Class definition

class MyThread(threading.Thread):
    def __init__(self):
        threading.Thread.__init__(self)

    def run(self):
        for i in range(10):
            print('MyThread: ' + str(i))
            time.sleep(1)

# Run threads

mt = MyThread()  # create thread instance
mt.start()  # start the thread
for i in range(10):
    print('Main: ' + str(i))
    time.sleep(1)

プログラムの前半がクラスの定義です。クラス名の宣言箇所を見てもらうとわかるように、threadingモジュールのThreadクラスを継承しています。このクラスを継承することで、マルチスレッドに必要な機能がそのクラスに加えられます。

コンストラクタの中で親クラスであるThreadを初期化しています。このあたりはオブジェクト指向の継承の記事に記載がありますので不安な方は見なおしておいてください。また、次の例で書きますがインスタンス変数の初期化なども必要であれば行ってください。

重要なのはコンストラクタの後にあるrunメソッドの定義です。簡単に言ってしまえばrunメソッドはマルチスレッドとして呼び出される処理を書くものですが、正確に言うと新たに作るのではなく親クラスのメソッドをオーバーライド(上書き)しています。

マルチスレッドの実行は前回とほとんど同じく、クラスをインスタンス化して、そのstartメソッドを呼び出します。するとマルチスレッドとして先ほど定義したrunメソッドが内部で呼び出されます。なお、startではなくrunメソッド自体を呼び出すと、マルチスレッドではなく普通のシングルスレッドとして実行されるので注意してください。

マルチスレッドとして動作させる処理に引数を渡したい場合は、コンストラクタへの引数を経由して渡すのが簡単です。たとえば前回の引数を使うマルチスレッドのプログラムを書き直すと以下のようになります。

import threading, time

class MyThread(threading.Thread):
    def __init__(self, name, sleep_time):
        threading.Thread.__init__(self)
        self.name = name
        self.sleep_time = sleep_time

    def run(self):
        for i in range(10):
            print(self.name + ': ' + str(i))
            time.sleep(self.sleep_time)

thread1 = MyThread('A', 1)
thread2 = MyThread('B', 1)
thread1.start()
thread2.start()

処理を定義する関数run自体ではなく、コンストラクタに引数を渡して、それをインスタンス変数として保持させています。そしてそれをrunメソッドの中で利用していることがわかりますね。このマルチスレッド対応のクラスを作るという手法は、前回お話しした関数をマルチスレッド化する方法に比べて複雑なプログラムを書きやすい場合が多いです。少しコードが長くなってしまうこともありますが、よりオブジェクト指向に沿った設計がしやすいので積極的に使ってください。

スレッドの処理結果の取得

今までにお話したマルチスレッドでは、スレッドをただ立ち上げて処理をするだけでした。そのようにスレッドを走らせるだけでことたりる場合もありますが、「スレッドを走らせてある処理を行い、その結果を取得する」という場合も多いです。

ある処理をさせた結果を取得するには、関数であればreturn文で実現可能です。ただ、スレッドではあくまでもstart()メソッドを呼び出すだけであり、そのまま値をreturnさせることができません。そのため、なんらかの別の方法を使う必要があります。これにはいくつかの方法がありますが、簡単なものは継承したクラスに「返り値を格納するインスタンス変数」や「値を取得するためのメソッド」を定義してあげるというものです。

普通の関数の利用による返り値の取得と、マルチスレッド利用時の取得の違いを以下の図にまとめます。

関数の利用による返り値の取得と、マルチスレッド利用時の取得の違い

簡単な例を通して確認してみましょう。スレッドに時間がかかる処理をさせて、それが終わってから処理結果を取得するというプログラムです。今回はフィボナッチ数列(プログラミングでは有名なお題なので知らない人は調べてみてください)を求めるものとします。

import threading, time

class MyThread(threading.Thread):
    def __init__(self, count):
        threading.Thread.__init__(self)
        self.count = count
        self.return_value = None   # RETURN VALUE

    def run(self):
        sum_value = 0
        for i in range(1, 1 + self.count):
            sum_value += i
            time.sleep(0.1)
        self.return_value = sum_value   # SET RETURN VALUE

    def get_value(self):  # GETTER METHOD
        return self.return_value

thread1 = MyThread(5)
thread1.start()
thread1.join()
print(thread1.return_value)  # 15
print(thread1.get_value())   # 15

上記コードのコメント箇所が値取得のポイントとなります。まず、コンストラクタにて返り値を格納するためのインスタンス変数が定義されています。そしてマルチスレッドとして実行されるrunメソッド内にてこのインタンス変数に値が格納されています。

あとはこのインスタンス変数を"インスタンス.変数"として直接取り出すなり、メソッドを経由してとりだすなりしています。なお、重要なのは値を取り出す前にjoinメソッドでマルチスレッドが終わるまで待っていることです。joinを呼び出さないと、対象となるマルチメソッド内で返り値が得られる前にその値を取り出そうとします。今回であればNoneが帰ってくるはずです。

インスタンス変数を直接取り出すにしろメソッド経由にしろ、結果はどちらも同じなのですが、「Pythonであっても厳格なコードを書く」という場合以外は前者で十分な気がします。

前回利用したthreading.Threadクラスのコンストラクタにマルチスレッド化するメソッドを指定する方式でも返り値を得ることは可能です。ただ、それをやろうとすれば何らかの「実態がひとつとなるオブジェクト」を両スレッドで共有し、それを経由して値をやりとりするなどのあまりオブジェクト指向らしからぬコードになるのであまりオススメできないかもしれません。

あまり真似してほしくないのですが、一応サンプルコードを記載します。

import threading, time

def get_fibo(count, value_dict):
    sum_value = 0
    for i in range(1, 1 + count):
        sum_value += i
        time.sleep(0.1)
        value_dict['fibo'] = sum_value  # Set Return Value

value_dict = {}  # Shared object (dict)
thread1 = threading.Thread(target=get_fibo, args=(5, value_dict,)) 
thread1.start()
thread1.join()
print(value_dict['fibo']) # 15

今回は辞書型のオブジェクトを利用しています。まず辞書オブジェクトを作成し、それをマルチスレッド化する関数に引数として渡します。これはThreadクラスのコンストラクタでやるのでしたね。マルチスレッドとして動作する関数get_fibo内で、この受け取ったオブジェクトに対して返り値を格納しています。そして呼び出し元でそれを取得しています。

数字や文字列といった基本となる型は値渡しであり、リストや辞書型及びインスタンスは参照渡しになります。使いたい型がどちらになるかわからない場合は実際に自分で確かめてみてもいいと思います。

マルチスレッド特有の問題

スレッドの実行自体は簡単ですが、難しいのは「複数のスレッド間で連携をとること」です。逐次実行(マルチスレッドを使わない通常のプログラミングスタイル)であれば、「あれをやってこれをやって」と処理を頭で追うことは可能ですが、いくつもの処理が同時に走るとなると、どのような処理が実際に行われているか想像することが難しくなります。スレッド間の連携のベストプラクティスなどもあるのですが、本連載ではそこまで深入りせずに簡単な概念や手法についてのみ扱います。

マルチスレッドの難しさを簡単な例をあげて説明したいと思います。まずはわかりやすいリソースの競合の問題について扱います。たとえば、あるリソースXをスレッドAとスレッドBから利用しているとしましょう。具体的にはお店の在庫管理のデータXをシステムA(実店舗)とシステムB(オンラインストア)から操作しているとします。このシステムA、Bがそれぞれマルチスレッドだと考えてください。本来はマルチスレッドというよりはデータベースのトランザクションなのですが、本質は同じです。

このとき、ある商品Xを店舗とオンラインストアで同時に購入した場合にリソース競合の対策をしていなければ、管理されている在庫数に不整合が発生する可能性があります。以下の図は不整合が発生するまでの処理の流れとなります。

不整合が発生するまでの処理の流れ

図を上から下に時系列に追ってみます。まず、商品Xの在庫が4あるとしましょう。店舗でお客さんがそれを購入する場合、実店舗のシステムAが在庫数を読み取り、その値からひとつ引いた3を新しい在庫数として登録します。ただ、そのシステムAが在庫数4を読み取り3を設定する間に、オンラインストアのシステムBでも同じ商品Xが購入されたとしましょう。オンラインストアも実店舗と同様に在庫数4を読み取り、新しく在庫数3を設定しようとします。このような状況では、上記のように実際には2つの製品が売れているにもかかわらず、在庫数は3となってしまいます。

このとき何が問題かというと、製品Xの在庫数という同じリソースを同時にスレッドAとスレッドBからアクセスしてしまったことです。Aが在庫数を4 -> 3に更新したあとに、Bが本来であれば3 -> 2にすべきところを古い情報を参照して4 -> 3に変更するつもりで3 -> 3へと変更してしまっています。

このような処理を防ぐためにはスレッド間で同期を行う必要があります。具体的にはリソースXをスレッドAが操作している間は、スレッドBはそれにアクセスできないようにして待たせてしまえばいいのです。処理の流れとしては以下の図のようになります。

スレッド間で同期を行うための処理の流れ

これを実現するためにはセマフォやロックと呼ばれる排他制御を行います。排他制御は読んで字のごとくほかのスレッドに実行させない制御であり、具体的には「ある特定の処理をしている間は、ほかのスレッドはその処理を同時実行せずに終わるまで待つ」ことを実現します。Pythonにはいくつかの排他制御の手法がありますが、今回は一番簡単なthreading.Lockを使います。

まず参考のために排他制御をしないプログラムを書いてみます。2つのスレッドからアクセスされる共通リソースがglobal_counterで、これが5となっています(補足:メソッドrun内のglobal global_counterはグローバル変数にアクセスするための宣言)。各スレッドは「その値を読み込む。作業をする(sleep)。1減らした値を書き込む」という作業をします。2つのスレッドがこの処理をすれば、global_valueは3となってほしいところですが、特定条件においては3になりません。

import threading, time

global_counter = 5

class MyThread(threading.Thread):
    def __init__(self, name, sleep_time):
        threading.Thread.__init__(self)
        self.name = name
        self.sleep_time = sleep_time

    def run(self):
        global global_counter

        # read
        count = global_counter
        print(self.name + ': read global_value ' + str(global_counter))

        # do something
        print(self.name + ': do something')
        time.sleep(self.sleep_time)

        # write
        global_counter = count - 1
        print(self.name + ': write global_value ' + str(global_counter))


thread1 = MyThread('A', 5)
thread2 = MyThread('B', 3)
thread1.start()
time.sleep(1)
thread2.start()

thread1.join()
thread2.join()
print('Result: ' + str(global_counter))

これを実行すると以下のような出力が得られ、最終的にglobal_valueが4になっていることがわかります。

A: read global_value 5
A: do something
B: read global_value 5
B: do something
B: write global_value 4
A: write global_value 4
Result: 4

排他制御を使ったコードに書きなおしてみます。注目してほしいのはglobal_lock変数の使い方です。

import threading, time

global_counter = 5
global_lock = threading.Lock()   # LOCK OBJECT

class MyThread(threading.Thread):
    def __init__(self, name, sleep_time):
        threading.Thread.__init__(self)
        self.name = name
        self.sleep_time = sleep_time

    def run(self):
        global global_counter
        global global_lock

        # LOCK
        global_lock.acquire()

        count = global_counter
        print(self.name + ': read global_value ' + str(global_counter))
        print(self.name + ': do something')
        time.sleep(self.sleep_time)
        global_counter = count - 1
        print(self.name + ': write global_value ' + str(global_counter))

        # RELEASE
        global_lock.release()

まず、排他制御のためのオブジェクトを作っています。そして排他制御を開始したい場合はそのオブジェクトをlockし、作業を行い、releaseします。 あるスレッドがlockをしている場合は、別のスレッドは「lockする箇所」で待機し、リソースがreleaseされると、次は自分がlockをします。

そのため、実行結果は以下のように変わります。

A: read global_value 5
A: do something
A: write global_value 4
B: read global_value 4
B: do something
B: write global_value 3
Result: 3

スレッドAがlockしてからreleaseするまで、スレッドBはread、write処理ができません。そのため、リソース競合が発生せずに問題を回避できています。なお、releaseをすることを決して忘れないでください。たとえば例外が発生してreleaseするコードが飛ばされてしまったという場合でも、lockされた処理をどのスレッドも実行できなくなります。今回はthreading.Lockのインスタンスをglobal変数で定義しましたが、同じクラスのスレッドだけで同期をとるのであればクラス変数を利用することを推奨します。

なお、ロックとリリースを複数のリソースに対して別々に行うとデッドロックといった問題も発生しえます。興味があるかたは調べてみてください。複雑なマルチスレッドのプログラムを設計することもデバッグすることも非常に困難なので、可能なかぎりシンプルな使い方を心がけてください。

マルチスレッド以外の並列化方法

前回、今回とマルチスレッドを使った並列処理について扱いました。単に並列処理をしたいだけであればマルチスレッド以外にもいくつかの手法があるので簡単に紹介します。

まず、シングルスレッドのPythonのプログラムを複数同時に走らせるという手法もあります。たとえばUnixに近い環境を使っている場合、特定のコマンドをバックグラウンドで実行させることが簡単にできます。シェルスクリプトを使って、「Pythonコマンドをバックグランドで実行 -> 同じプログラムを必要な数だけ実行」などとすることで通常のPythonのプログラムを並列に走らせることができます。

また、Pythonもシェルのコマンドを発行できると以前お話ししましたが、それを利用してPythonのマルチスレッドを使って、Pythonコマンドを呼び出すという使い方もできます。メインとなる処理そのものをマルチスレッド化するのではなく、あくまでもコマンドの呼び出しだけをマルチスレッド化しているので、プログラムの設計は極めてシンプルになります。ほかにも並列化の手法や概念はいろいろとありますので興味があるかたは調べてみてください。


次回はPythonでのデバッグについて扱います。

執筆者紹介

伊藤裕一(ITO Yuichi)

シスコシステムズでの業務と大学での研究活動でコンピュータネットワークに6年関わる。専門はL2/L3 Switching とデータセンター関連技術およびSDN。TACとしてシスコ顧客のテクニカルサポート業務に従事。社内向けのソフトウェア関連のトレーニングおよびデータセンタとSDN関係の外部講演なども行う。

もともと仮想ネットワーク関連技術の研究開発に従事していたこともあり、ネットワークだけでなくプログラミングやLinux関連技術にも精通。Cisco社内外向けのトラブルシューティングツールの開発や、趣味で音声合成処理のアプリケーションやサービスを開発。

Cisco CCIE R&S, Red Hat Certified Engineer, Oracle Java Gold,2009年度 IPA 未踏プロジェクト採択

詳細(英語)はこちら