Morikatron Engineer Blog

モリカトロン開発者ブログ

WebSocket を TCP の代わりに使ってみる

どうも、モリカトロンでプログラマおじさんをしている岡島です。

現在 TCP を使ってプロセス間の通信を行っているのですが、同一のマシンで沢山のプロセスと通信しようとすると空いているポート番号がたくさん必要になるし、 ネットワーク上の複数の機器を使って分散処理をしようとするとポート開放をしないといけない……と悩むことが多くなってきました。 そこで TCP の代わりに WebSocket を使えないないかを検討しています。

ということで、今回は Python ですでに TCP を使って通信をしているプログラムを出来るだけ少ない変更で WebSocket に置き換えられないかというお話です。

TCP 通信をしていて困ってること

Python に限った話ではありませんが、別のプログラムとデータのやり取りをしたい場合の最も手っ取り早い方法の一つとして TCP が挙げられます。

例えば「ゲームを攻略するAIを作ろう」と思ったとき、ゲームプログラムは Unreal Engine 4 や Unity といったゲームエンジンを利用して作りたいし、 機械学習関連についてはフレームワークが充実している Python を利用したいので、それぞれ別に作って TCP でデータのやり取りを行うという方法はよくあるんじゃないでしょうか。

ということで僕も TCP を使ってプロセス間通信をしていたのですが、分散学習の仕組みを整えたり、それを大規模化しようと考えていくと

  • 同一機器上で TCP 通信をする場合、それぞれに異なる TCP ポート番号を割り当てる必要がある
  • 別のネットワークにある機器や、Firewall 越しに通信する場合、使用する TCP ポートのポート開放が必要となる

ということに頭を悩ませることが多くなりました。

TCP の代わりとして見る WebSocket

そこで、より ”あんまり考えなくていい” ように接続の確立に HTTP を使う WebSocket を代わりに使えないかを検討しています。

実は最初は乗り気じゃなかったのですが、以下のブログを読んでみると接続の確立以降はあまり TCP と変わらなさそうに思えてきます。

kiririmode.hatenablog.jp

同一機器上でもネットワーク越しでも同じように通信できるんなら、乗り換えるメリットが大きそうですね。

TCP を WebSocket に置き換えてみる

実際に TCP 通信を WebSocket に置き換えてみましょう。

Python で使える WebSocket

Python は標準では WebSoket をサポートしていませんので、まずは使うモジュールを探すところからです。

pip 経由でインストール可能なモジュールを探してみると大量に見つかりますが、今回は…

  • ちゃんとドキュメントが用意されている
  • Server と Client の双方が使える
  • ちゃんとメンテナンスされている

などの理由から websockets を選びました。

websockets.readthedocs.io

なにやら asyncio を利用しているみたいですが、サンプルコードも非常にシンプル…… asyncio?

asyncio?

asyncio は Python 3.4 から採用された比較的新しい機能で、コルーチンという仕組みを使って並列処理を行えるようにするものです。
細かい説明はここではしませんので、詳しくは Python 公式ドキュメントを参照ください。

docs.python.org

公式以外の日本語の解説だとこちらが比較的理解しやすい記事だと思います。 この記事を読んでもう少し asyncio を知りたくなった方は合わせてどうぞ。

iuk.hateblo.jp note.crohaco.net

WebSocket で通信したいだけなのに asyncio をひと通り理解して、それに従うようにプログラムを書き直すのはちょっとハイカロリー。 ひとまずは非同期処理の恩恵を得られませんが、

await のついた API を呼び出すときには fuga = func(hoge) と直接呼び出すのではなく、 fuga = asyncio.get_event_loop().run_until_complete( func(hoge) ) という風に書く。
(イベントループで func(hoge) を実行して、それが完了するまで待ち、返り値を fuga に格納するの意)

とだけ理解するところから始めると良いと思います。

簡単な TCP 通信のプログラムを Websocket に置き換えてみる

それでは TCP 通信を WebSocket に置き換えてみましょう。

websockets は pip 経由で配布されているので、事前にインストールしておいてください。

pip install websockets

以下は、TCP 通信で JSON のデータをやり取りする簡単なプログラムです。

tcp_server.py

https://github.com/morikatron/snippet/blob/master/websockets/tcp_server.py

import socket
import json

host = "localhost"
port = 8001
receive_buffer_size = 4096
queue_size = 10

# サーバー立ち上げ
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host, port))
server.listen(queue_size)

# クライアント接続待ち
client, address = server.accept()

while True:
    # 受信
    received_packet = client.recv(receive_buffer_size)
    if len(received_packet) == 0:
        # クライアントが切断した場合 recv() は 0Byte のByte列を返す
        print("Lost connection.")
        break
    dictionary = json.loads(received_packet.decode())
    print(dictionary)

    # 送信
    dictionary['message'] = 'Message from Server'
    dictionary['number'] = 128
    dictionary['bool'] = False
    packet = json.dumps(dictionary).encode()
    client.send(packet)

# 終了
client.close()
server.close()
print("Finish.")

tcp_client.py

https://github.com/morikatron/snippet/blob/master/websockets/tcp_client.py

import socket
import json

host = "localhost"
port = 8001
receive_buffer_size = 4096

# 接続
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect((host, port))
print("Successfully connected.")

# 送信
dictionary = {'message': 'Message from Client', 'number': 256, 'bool': True}
packet = json.dumps(dictionary).encode()
client.send(packet)

# 受信
received_packet = client.recv(receive_buffer_size)
dictionary = json.loads(received_packet.decode())
print(dictionary)

# 終了
client.close()
print("Finish.")

これと出来るだけ同じように websockets を使って書くと以下のようになります。

ws_server.py

https://github.com/morikatron/snippet/blob/master/websockets/ws_server.py

import asyncio
import websockets
import json

address = "localhost"
port = 8001

# 受信コールバック
async def server(websocket, path):
    # 受信
    received_packet = await websocket.recv()
    dictionary = json.loads(received_packet.decode())
    print("{}: {}".format(path, dictionary))

    # 送信
    dictionary['message'] = 'Message from Server'
    dictionary['bool'] = False
    packet = json.dumps(dictionary).encode()
    await websocket.send(packet)

start_server = websockets.serve(server, address, port)

# サーバー立ち上げ
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

ws_client.py

https://github.com/morikatron/snippet/blob/master/websockets/ws_client.py

import asyncio
import websockets
import json

loop = asyncio.get_event_loop()

# 接続
uri = "ws://localhost:8001"
websocket = loop.run_until_complete(websockets.connect(uri))

# 送信
dictionary = {'message': 'Message from Client', 'number': 256, 'bool': True}
packet = json.dumps(dictionary).encode()
loop.run_until_complete(websocket.send(packet))

# 受信
received_packet = loop.run_until_complete(websocket.recv())
dictionary = json.loads(received_packet.decode())
print(dictionary)

# 終了
loop.run_until_complete(websocket.close())
loop.close()
print("Finish.")

サーバー側は受信をイベントループ側で行う都合上 tcp_server.py に寄せられませんでしたが、クライアント側はほぼ tcp_client.py と同じようになりました。 サーバー側を似せるのは websockets モジュールの仕組み上難しいですし、他の WebSocket モジュールでも難しいので、ここはあきらめるしかなさそうですね。

複数のクライアントが接続するようなコードを書く場合 websockets の方がスッキリ書けそうなので 作り直すコストはきっと少ないんじゃないですかね

同一機器から複数接続してみる

ついでに、TCP では出来なかった同一機器からの複数接続も出来るはずなので試してみましょう。 サーバー側は ws_server.py で問題ないのでクライアント側をマルチスレッドにして…

ws_client_multithread.py (これは正しく動かない)

import asyncio
import websockets
import json
import threading

def thread_func(num):

    # イベントループを各スレッドごとに用意する必要があります。
    loop = asyncio.get_event_loop()
    asyncio.set_event_loop(loop)

    # 接続
    uri = "ws://localhost:8001" + "/client{}".format(num)
    websocket = loop.run_until_complete(websockets.connect(uri))
    print("Connect Success.")

    # 受信
    dictionary = {'message': 'Message from Client', 'number': num, 'bool': True}
    packet = json.dumps(dictionary).encode()  # Python の文字列は UTF-8 なので、BYTE 型に変換して送信する
    loop.run_until_complete(websocket.send(packet))

    # 受信
    received_packet = loop.run_until_complete(websocket.recv())
    dictionary = json.loads(received_packet.decode())
    print(dictionary)

    loop.run_until_complete(websocket.close())
    loop.close()
    print("Finish")

# threading で同時に
threads = []
for i in range(10):
    thread = threading.Thread(target=thread_func, args=(i,))
    threads.append(thread)
    thread.start()
for thread in threads:
    thread.join()

print("Done.")

実行すると asyncio.get_event_loop()RuntimeError が発生して 止まります

asyncio.get_event_loop() はサブスレッドから呼び出せない

日本語でこれを説明している情報が見つかりませんでしたが、stackoverflow のこの質問を見るとどうやらメインスレッド以外のスレッドにはイベントループが存在しないためにエラーになるようです。

stackoverflow.com

解決策は、スレッド毎に asyncio.new_event_loop() イベントループを作成してasyncioがそれを使うようにセットする必要があるそうです。
具体的には以下のようにします。

正しく動く ws_client_multithread.py

https://github.com/morikatron/snippet/blob/master/websockets/ws_client_multithread.py

import asyncio
import websockets
import json
import threading


def thread_func(num):
    # イベントループを各スレッドごとに用意する必要があります。
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)

    # 接続
    uri = "ws://localhost:8001" + "/client{}".format(num)
    websocket = loop.run_until_complete(websockets.connect(uri))
    print("thread {} connected.".format(num))

    # 受信
    dictionary = {'message': 'Message from Client', 'number': num, 'bool': True}
    packet = json.dumps(dictionary).encode()  # Python の文字列は UTF-8 なので、BYTE 型に変換して送信する
    loop.run_until_complete(websocket.send(packet))

    # 受信
    received_packet = loop.run_until_complete(websocket.recv())
    dictionary = json.loads(received_packet.decode())
    print(dictionary)

    loop.run_until_complete(websocket.close())
    loop.close()
    print("thread {} finish.".format(num))


# threading で同時に
threads = []
for i in range(10):
    thread = threading.Thread(target=thread_func, args=(i,))
    threads.append(thread)
    thread.start()
for thread in threads:
    thread.join()

print("Done.")

今度こそ、複数接続が出来ました。

最後に

結局 サーバー側の処理は TCP から移行する際に書き直す必要がありそうですが、(非同期に処理するメリットを捨てれば) asyncio のことを深く考えることなく websockets モジュールを使って WebSocket による通信を実装することは出来そうです。

会社や学校のネットワークだと気軽にポート開放出来ないことも多いでしょうし、通信を実装する際には皆様も websockets を試してみてはいかがでしょうか?