サーバソケットの多重化(fork())(Pythonによるネットワークプログラミング)

本記事は、Pythonによるネットワークプログラミングについての学習メモとなります。
参考書籍としてLinuxネットワークプログラミングバイブルを用い、同書の内容に沿ったかたちで、Pythonに書き直しをしていきます。
今回は、サーバソケットのマルチクライアント化(多重化)の方法として、fork()を利用した方法について学んでいきます。
ネットワークプログラミングについて
ネットワークプログラミングの目的はデータの送受信をすることで、そのためにソケットというインターフェースを利用します。
このソケットは、TCP/IPの誕生時にBSD Uinux上に実装されたものですが、非常に使い勝手が良かったため、WindowsなどのUnix系以外のOSでも利用されています。
Pythonで実装する際の手順やオプションの設定なども、概ねそのままプログラミングすることができます。
多重化とは
通常のサーバソケットでは、accept()(もしくはrecv())を呼び出した後、それ以降の処理をブロックしてしまいます(ブロッキングIO)。
そのため、他のコネクションが確立したソケットは、その処理が完了するまで待機させられ、クライアントごとの同時接続ができない使い勝手の悪いアプリケーションになってしまいます。
これらの問題を解決するためには、多重化と呼ばれる技術を使い、マルチクライアント化に対応する必要があります。
fork()によるマルチクライアント化
前回はepoll()を用いた多重化について学びました。
前回までで学んだselect(), poll(), epoll()などによる多重化では、一つのサーバプロセス内でマルチクライアント化を実現していました。
今回のfork()では、クライアントから接続要求があった際にサーバプロセス(親プロセス)がクライアントごとに新たなプロセス(子プロセス)を生成して、マルチクライアント化に対応します。
Pythonではmultiprocessingモジュールを使うことで、容易にマルチプロセスを利用したプログラムを作成することが可能となっておりますが、Unix環境で実行する際は内部でos.fork()が利用されています。
マルチプロセスでは、新たに別のプロセスを生成し、それぞれがOSから与えられた別のメモリ空間上でプログラムを動かすため、グローバル変数などで互いに参照することはできません。
しかし、それぞれが独立したプロセスのため、子プロセスが異常終了したからといって親プロセスや他の子プロセスまで異常終了するようなことはありません。
サーバプログラムの作成
それでは、fork()を利用したマルチプロセスによるサーバプログラムを作成します。
以下がソースコードになります。
# server_multi_fork.py
import socket
import sys
import signal
import os
import errno
def server_socket(portnum):
try:
for res in socket.getaddrinfo(None, portnum, socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_PASSIVE):
af, socktype, proto, canonname, sa = res
break
except socket.gaieor as e:
print("getaddrinfo():{}".format(e))
sys.exit(1)
try:
nbuf, sbuf = socket.getnameinfo(sa, socket.AI_PASSIVE)
except socket.gaieor as e:
print("getnameinfo():{}".format(e))
sys.exit(1)
print("port={}".format(sbuf))
try:
soc = socket.socket(af, socktype, proto)
except OSError as e:
print("socket:{}".format(e))
sys.exit(1)
try:
soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
except OSError as e:
print("setsockopt:{}".format(e))
sys.exit(1)
try:
soc.bind(sa)
except OSError as e:
print("bind:{}".format(e))
soc.close()
sys.exit(1)
try:
soc.listen(socket.SOMAXCONN)
except OSError as e:
print("listen:{}".format(e))
soc.close()
sys.exit(1)
return soc
def accept_loop(soc):
while True:
try:
acc, addr = soc.accept()
hbuf, sbuf = socket.getnameinfo(addr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
print("accept:{}:{}".format(hbuf, sbuf))
try:
pid = os.fork()
except OSError as e:
print("fork():{}".format(e))
sys.exit(1)
if pid == 0:
send_recv(acc, hbuf, sbuf)
acc.close()
sys.exit(0)
elif pid > 0:
pass
try:
pid, status = os.waitpid(-1, os.WNOHANG)
except ChildProcessError as e:
continue
if pid > 0:
print("accept=loop:waitpid:pid={},status={}".format(pid, status))
print('WIFEXITED:{},WEXITSTATUS:{},WIFSIGNALED:{}\
WTERMSIG:{},WIFSTOPPED:{},WSTOPSIG:{}'.format(
os.WIFEXITED(status),
os.WEXITSTATUS(status),
os.WIFSIGNALED(status),
os.WTERMSIG(status),
os.WIFSTOPPED(status),
os.WSTOPSIG(status)))
except InterruptedError as e:
if e.errno != errno.EINTR:
print("accept:{}".format(e))
def send_recv(acc, host, port):
buf_size = 512
while True:
try:
data = acc.recv(buf_size)
except InterruptedError as e:
print("recv:{}".format(e))
break
if (len(data) == 0):
# EOF
print("[{}]recv:EOF".format(os.getpid()))
break
data = data.rstrip()
try:
print("[{}]{}".format(os.getpid(), data.decode('utf-8')))
except UnicodeDecodeError:
pass
try:
acc.send(data + b':OK\r\n')
except InterruptedError as e:
print("send:{}".format(e))
break
acc.close()
def sig_child_handler(sig, frame):
pid, status = os.wait()
print('signum={} sig_chld_handler:wait:pid={}, status={}'.format(sig,pid, status))
print('WIFEXITED:{},WEXITSTATUS:{},WIFSIGNALED:{}\
WTERMSIG:{},WIFSTOPPED:{},WSTOPSIG:{}'.format(
os.WIFEXITED(status),
os.WEXITSTATUS(status),
os.WIFSIGNALED(status),
os.WTERMSIG(status),
os.WIFSTOPPED(status),
os.WSTOPSIG(status)))
if __name__ == '__main__':
if (len(sys.argv) != 2):
print("Usage: {} <server port>.format(sys.argv[0]))
sys.exit(1)
signal.signal(signal.SIGCHLD, sig_child_handler)
soc = server_socket(sys.argv[1])
print("ready for accept")
try:
accept_loop(soc)
soc.close()
except KeyboardInterrupt:
soc.close()
sys.exit(1)
動作確認
それでは、上記プログラムを実行してみます。
> python3 server_multi_fork.py 8888 port=8888 ready for accept accept:127.0.0.1:55546 # クライアント1接続 accept:127.0.0.1:55547 # クライアント2接続 [2481]ok [2576]ok [2481]recv:EOF # クライアント1切断 signum=17 sig_chld_handler:wait:pid=2481, status=0 WIFEXITED:True,WEXITSTATUS:0,WIFSIGNALED:False WTERMSIG:0,WIFSTOPPED:False,WSTOPSIG:0 [2576]recv:EOF # クライアント2切断 signum=17 sig_chld_handler:wait:pid=2576, status=0 WIFEXITED:True,WEXITSTATUS:0,WIFSIGNALED:False WTERMSIG:0,WIFSTOPPED:False,WSTOPSIG:0 # プロセスの状態 > ps -ef | grep server_multi vagrant 2479 1854 0 06:36 pts/0 00:00:00 python3 server_multi_fork.py 8888 vagrant 2481 2479 0 06:36 pts/0 00:00:00 python3 server_multi_fork.py 8888 vagrant 2576 2479 0 06:37 pts/0 00:00:00 python3 server_multi_fork.py 8888
ちゃんと子プロセスが生成されていることが確認できました。
最後に
今回はサーバのマルチクライアント化として、fork()を利用した方法について学びました。
次回はthreadingモジュール(pthread)を利用したマルチスレッドによるマルチクライアント化について学んでいきます。