どうも、モリカトロンでプログラマおじさんをしている岡島です。
現在 TCP を使ってプロセス間の通信を行っているのですが、同一のマシンで沢山のプロセスと通信しようとすると空いているポート番号がたくさん必要になるし、 ネットワーク上の複数の機器を使って分散処理をしようとするとポート開放をしないといけない……と悩むことが多くなってきました。 そこで TCP の代わりに WebSocket を使えないないかを検討しています。
ということで、今回は Python ですでに TCP を使って通信をしているプログラムを出来るだけ少ない変更で WebSocket に置き換えられないかというお話です。
TCP 通信をしていて困ってること
Python に限った話ではありませんが、別のプログラムとデータのやり取りをしたい場合の最も手っ取り早い方法の一つとして TCP が挙げられます。
例えば「ゲームを攻略するAIを作ろう」と思ったとき、ゲームプログラムは Unreal Engine 4 や Unity といったゲームエンジンを利用して作りたいし、 機械学習関連についてはフレームワークが充実している Python を利用したいので、それぞれ別に作って TCP でデータのやり取りを行うという方法はよくあるんじゃないでしょうか。
ということで僕も TCP を使ってプロセス間通信をしていたのですが、分散学習の仕組みを整えたり、それを大規模化しようと考えていくと
- 同一機器上で TCP 通信をする場合、それぞれに異なる TCP ポート番号を割り当てる必要がある
- 別のネットワークにある機器や、Firewall 越しに通信する場合、使用する TCP ポートのポート開放が必要となる
ということに頭を悩ませることが多くなりました。
TCP の代わりとして見る WebSocket
そこで、より ”あんまり考えなくていい” ように接続の確立に HTTP を使う WebSocket を代わりに使えないかを検討しています。
実は最初は乗り気じゃなかったのですが、以下のブログを読んでみると接続の確立以降はあまり TCP と変わらなさそうに思えてきます。
同一機器上でもネットワーク越しでも同じように通信できるんなら、乗り換えるメリットが大きそうですね。
TCP を WebSocket に置き換えてみる
実際に TCP 通信を WebSocket に置き換えてみましょう。
Python で使える WebSocket
Python は標準では WebSoket をサポートしていませんので、まずは使うモジュールを探すところからです。
pip 経由でインストール可能なモジュールを探してみると大量に見つかりますが、今回は…
- ちゃんとドキュメントが用意されている
- Server と Client の双方が使える
- ちゃんとメンテナンスされている
などの理由から websockets を選びました。
なにやら asyncio を利用しているみたいですが、サンプルコードも非常にシンプル…… asyncio?
asyncio?
asyncio は Python 3.4 から採用された比較的新しい機能で、コルーチンという仕組みを使って並列処理を行えるようにするものです。
細かい説明はここではしませんので、詳しくは Python 公式ドキュメントを参照ください。
公式以外の日本語の解説だとこちらが比較的理解しやすい記事だと思います。 この記事を読んでもう少し asyncio を知りたくなった方は合わせてどうぞ。
iuk.hateblo.jp note.crohaco.net
WebSocket で通信したいだけなのに asyncio をひと通り理解して、それに従うようにプログラムを書き直すのはちょっとハイカロリー。 ひとまずは非同期処理の恩恵を得られませんが、
await のついた API を呼び出すときには
fuga = func(hoge)
と直接呼び出すのではなく、fuga = asyncio.get_event_loop().run_until_complete( func(hoge) )
という風に書く。
(イベントループでfunc(hoge)
を実行して、それが完了するまで待ち、返り値を fuga に格納するの意)
とだけ理解するところから始めると良いと思います。
簡単な TCP 通信のプログラムを Websocket に置き換えてみる
それでは TCP 通信を WebSocket に置き換えてみましょう。
websockets は pip 経由で配布されているので、事前にインストールしておいてください。
pip install websockets
以下は、TCP 通信で JSON のデータをやり取りする簡単なプログラムです。
tcp_server.py
https://github.com/morikatron/snippet/blob/master/websockets/tcp_server.py
import socket import json host = "localhost" port = 8001 receive_buffer_size = 4096 queue_size = 10 # サーバー立ち上げ server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) server.bind((host, port)) server.listen(queue_size) # クライアント接続待ち client, address = server.accept() while True: # 受信 received_packet = client.recv(receive_buffer_size) if len(received_packet) == 0: # クライアントが切断した場合 recv() は 0Byte のByte列を返す print("Lost connection.") break dictionary = json.loads(received_packet.decode()) print(dictionary) # 送信 dictionary['message'] = 'Message from Server' dictionary['number'] = 128 dictionary['bool'] = False packet = json.dumps(dictionary).encode() client.send(packet) # 終了 client.close() server.close() print("Finish.")
tcp_client.py
https://github.com/morikatron/snippet/blob/master/websockets/tcp_client.py
import socket import json host = "localhost" port = 8001 receive_buffer_size = 4096 # 接続 client = socket.socket(socket.AF_INET, socket.SOCK_STREAM) client.connect((host, port)) print("Successfully connected.") # 送信 dictionary = {'message': 'Message from Client', 'number': 256, 'bool': True} packet = json.dumps(dictionary).encode() client.send(packet) # 受信 received_packet = client.recv(receive_buffer_size) dictionary = json.loads(received_packet.decode()) print(dictionary) # 終了 client.close() print("Finish.")
これと出来るだけ同じように websockets を使って書くと以下のようになります。
ws_server.py
https://github.com/morikatron/snippet/blob/master/websockets/ws_server.py
import asyncio import websockets import json address = "localhost" port = 8001 # 受信コールバック async def server(websocket, path): # 受信 received_packet = await websocket.recv() dictionary = json.loads(received_packet.decode()) print("{}: {}".format(path, dictionary)) # 送信 dictionary['message'] = 'Message from Server' dictionary['bool'] = False packet = json.dumps(dictionary).encode() await websocket.send(packet) start_server = websockets.serve(server, address, port) # サーバー立ち上げ asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()
ws_client.py
https://github.com/morikatron/snippet/blob/master/websockets/ws_client.py
import asyncio import websockets import json loop = asyncio.get_event_loop() # 接続 uri = "ws://localhost:8001" websocket = loop.run_until_complete(websockets.connect(uri)) # 送信 dictionary = {'message': 'Message from Client', 'number': 256, 'bool': True} packet = json.dumps(dictionary).encode() loop.run_until_complete(websocket.send(packet)) # 受信 received_packet = loop.run_until_complete(websocket.recv()) dictionary = json.loads(received_packet.decode()) print(dictionary) # 終了 loop.run_until_complete(websocket.close()) loop.close() print("Finish.")
サーバー側は受信をイベントループ側で行う都合上 tcp_server.py に寄せられませんでしたが、クライアント側はほぼ tcp_client.py と同じようになりました。 サーバー側を似せるのは websockets モジュールの仕組み上難しいですし、他の WebSocket モジュールでも難しいので、ここはあきらめるしかなさそうですね。
複数のクライアントが接続するようなコードを書く場合 websockets の方がスッキリ書けそうなので 作り直すコストはきっと少ないんじゃないですかね
同一機器から複数接続してみる
ついでに、TCP では出来なかった同一機器からの複数接続も出来るはずなので試してみましょう。 サーバー側は ws_server.py で問題ないのでクライアント側をマルチスレッドにして…
ws_client_multithread.py (これは正しく動かない)
import asyncio import websockets import json import threading def thread_func(num): # イベントループを各スレッドごとに用意する必要があります。 loop = asyncio.get_event_loop() asyncio.set_event_loop(loop) # 接続 uri = "ws://localhost:8001" + "/client{}".format(num) websocket = loop.run_until_complete(websockets.connect(uri)) print("Connect Success.") # 受信 dictionary = {'message': 'Message from Client', 'number': num, 'bool': True} packet = json.dumps(dictionary).encode() # Python の文字列は UTF-8 なので、BYTE 型に変換して送信する loop.run_until_complete(websocket.send(packet)) # 受信 received_packet = loop.run_until_complete(websocket.recv()) dictionary = json.loads(received_packet.decode()) print(dictionary) loop.run_until_complete(websocket.close()) loop.close() print("Finish") # threading で同時に threads = [] for i in range(10): thread = threading.Thread(target=thread_func, args=(i,)) threads.append(thread) thread.start() for thread in threads: thread.join() print("Done.")
実行すると asyncio.get_event_loop()
で RuntimeError
が発生して 止まります。
asyncio.get_event_loop() はサブスレッドから呼び出せない
日本語でこれを説明している情報が見つかりませんでしたが、stackoverflow のこの質問を見るとどうやらメインスレッド以外のスレッドにはイベントループが存在しないためにエラーになるようです。
解決策は、スレッド毎に asyncio.new_event_loop() イベントループを作成してasyncioがそれを使うようにセットする必要があるそうです。
具体的には以下のようにします。
正しく動く ws_client_multithread.py
https://github.com/morikatron/snippet/blob/master/websockets/ws_client_multithread.py
import asyncio import websockets import json import threading def thread_func(num): # イベントループを各スレッドごとに用意する必要があります。 loop = asyncio.new_event_loop() asyncio.set_event_loop(loop) # 接続 uri = "ws://localhost:8001" + "/client{}".format(num) websocket = loop.run_until_complete(websockets.connect(uri)) print("thread {} connected.".format(num)) # 受信 dictionary = {'message': 'Message from Client', 'number': num, 'bool': True} packet = json.dumps(dictionary).encode() # Python の文字列は UTF-8 なので、BYTE 型に変換して送信する loop.run_until_complete(websocket.send(packet)) # 受信 received_packet = loop.run_until_complete(websocket.recv()) dictionary = json.loads(received_packet.decode()) print(dictionary) loop.run_until_complete(websocket.close()) loop.close() print("thread {} finish.".format(num)) # threading で同時に threads = [] for i in range(10): thread = threading.Thread(target=thread_func, args=(i,)) threads.append(thread) thread.start() for thread in threads: thread.join() print("Done.")
今度こそ、複数接続が出来ました。
最後に
結局 サーバー側の処理は TCP から移行する際に書き直す必要がありそうですが、(非同期に処理するメリットを捨てれば) asyncio のことを深く考えることなく websockets モジュールを使って WebSocket による通信を実装することは出来そうです。
会社や学校のネットワークだと気軽にポート開放出来ないことも多いでしょうし、通信を実装する際には皆様も websockets を試してみてはいかがでしょうか?