サーバソケットの多重化(epoll())(Pythonによるネットワークプログラミング)

本記事は、Pythonによるネットワークプログラミングについての学習メモとなります。
参考書籍としてLinuxネットワークプログラミングバイブルを用い、同書の内容に沿ったかたちで、Pythonに書き直しをしていきます。
今回は、サーバソケットのマルチクライアント化(多重化)の方法として、epoll()を利用した方法について学んでいきます。
ネットワークプログラミングについて
ネットワークプログラミングの目的はデータの送受信をすることで、そのためにソケットというインターフェースを利用します。
このソケットは、TCP/IPの誕生時にBSD Uinux上に実装されたものですが、非常に使い勝手が良かったため、WindowsなどのUnix系以外のOSでも利用されています。
Pythonで実装する際の手順やオプションの設定なども、概ねそのままプログラミングすることができます。
多重化とは
通常のサーバソケットでは、accept()(もしくはrecv())を呼び出した後、それ以降の処理をブロックしてしまいます(ブロッキングIO)。
そのため、他のコネクションが確立したソケットは、その処理が完了するまで待機させられ、クライアントごとの同時接続ができない使い勝手の悪いアプリケーションになってしまいます。
これらの問題を解決するためには、多重化と呼ばれる技術を使い、マルチクライアント化に対応する必要があります。
epoll()によるマルチクライアント化
前回はpoll()を用いた多重化について学びました。
接続するクライアントが少ない場合は、前回で学んだselect()やpoll()で行ったほうが処理速度としても問題はありませんが、クライアントが多くなった場合にループ処理の箇所での負荷がかかります。
それに比べてepoll()では、接続準備ができたディスクリプタのみ通知され、それに対してのみループ処理を実行するため、その分パフォーマンスが向上します。
監視するIOイベント情報の詳細は以下になります。
|
定数 |
意味 |
|---|---|
|
|
読み込み可能 |
|
|
書き込み可能 |
|
|
緊急の読み出しデータ |
|
|
設定された fd にエラー状態が発生した |
|
|
設定された fd がハングアップした |
|
|
エッジトリガ動作に設定する。デフォルトではレベルトリガ動作 |
|
|
1ショット動作に設定する。1回イベントが取り出されたら、その fd が内部で無効になる |
|
|
関連づけられた fd にイベントがある場合、1 つの epoll オブジェクトのみを起こします。デフォルトでは (このフラグが設定されていない場合には)、fd に対してポーリングするすべての epoll オブジェクトを起こします。 |
|
|
ストリームソケットの他端が接続を切断したか、接続の書き込み側のシャットダウンを行った。 |
|
|
|
|
|
優先データバンドを読み込める。 |
|
|
|
|
|
優先データに書き込みできる。 |
|
|
無視される。 |
https://docs.python.org/ja/3/library/select.htmlより抜粋
なお、タイムアウト値は秒単位で指定をします。
サーバプログラムの作成
今回は前回作成したサーバプログラムにepoll()によるマルチクライアント機能を実装していきます。
以下がソースコードになります。
# server_multi_epoll.py
import socket
import sys
import fcntl
import os
import errno
import select
MAX_CHILD = 20
def server_socket(portnum):
try:
for res in socket.getaddrinfo(None, portnum, socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
break
except socket.gaieor as e:
print("getaddrinfo():{}".format(e))
sys.exit(1)
try:
nbuf, sbuf = socket.getnameinfo(sa, socket.AI_PASSIVE)
except socket.gaieor as e:
print("getnameinfo():{}".format(e))
sys.exit(1)
print("port={}".format(sbuf))
try:
soc = socket.socket(af, socktype, proto)
except OSError as e:
print("socket:{}".format(e))
sys.exit(1)
try:
soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except OSError as e:
print("setsockopt:{}".format(e))
sys.exit(1)
try:
soc.bind(sa)
except OSError as e:
print("bind:{}".format(e))
soc.close()
sys.exit(1)
try:
soc.listen(socket.SOMAXCONN)
except OSError as e:
print("listen:{}".format(e))
soc.close()
sys.exit(1)
return soc
def accept_loop(soc):
timeout = 10
ev = select.epoll()
ev.register(soc, select.EPOLLIN)
count = 0
child = []
print("<<child count:{}>>".format(count))
while True:
try:
try:
r = ev.poll(timeout)
except OSError as e:
print("poll:{}".format(e))
break
if not r:
print("poll:timeout")
else:
for fd, event in r:
if fd == soc.fileno() and event & select.EPOLLIN:
acc, addr = soc.accept()
hbuf, sbuf = socket.getnameinfo(addr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
print("accept:{}:{}".format(hbuf, sbuf))
if len(child) > MAX_CHILD:
print("child is full: cannot accept")
acc.close()
else:
ev.register(acc)
child.append(acc)
count += 1
print("<<child count:{}>>".format(count))
elif event & (select.EPOLLIN | select.EPOLLERR):
for i, c in enumerate(child):
if c.fileno() == fd:
if send_recv(c, i+1) == -1:
child.remove(c)
ev.unregister(c)
count -= 1
print("<<child count:{}>>".format(count))
except InterruptedError as e:
if e.errno != errno.EINTR:
print("accept:{}".format(e))
def send_recv(acc, child_no):
buf_size = 512
try:
data = acc.recv(buf_size)
except InterruptedError as e:
print("recv:{}".format(e))
return -1
if (len(data) == 0):
# EOF
print("[child{}]recv:EOF".format(child_no))
return -1
data = data.rstrip()
try:
print("[child{}]{}".format(child_no, data.decode('utf-8')))
except UnicodeDecodeError:
pass
try:
acc.send(data + b':OK\r\n')
except InterruptedError as e:
print("send:{}".format(e))
return -1
return 0
if __name__ == '__main__':
if (len(sys.argv) != 2):
print("Usage: {} <server port>".format(sys.argv[0]))
sys.exit(1)
soc = server_socket(sys.argv[1])
print("ready for accept")
try:
accept_loop(soc)
soc.close()
except KeyboardInterrupt:
soc.close()
sys.exit(1)
動作確認
それでは、上記プログラムを実行してみます。
> python3 server_multi_epoll.py 8888 port=8888 ready for accept <<child count:0>> accept:127.0.0.1:52819 # クライアント1接続 <<child count:1>> [child1]ok <<child count:1>> accept:127.0.0.1:52820 # クライアント2接続 <<child count:2>> [child2]ok <<child count:2>> [child1]recv:EOF # クライアント1切断 <<child count:1>> [child1]recv:EOF # クライアント2切断 <<child count:0>>
最後に
今回はサーバのマルチクライアント化として、poll()を利用した方法について学びました。
次回はfork()を利用したマルチプロセスによるマルチクライアント化について学んでいきます。