Engineering Note

プログラミングなどの技術的なメモ

サーバソケットの多重化(fork())(Pythonによるネットワークプログラミング)

client

本記事は、Pythonによるネットワークプログラミングについての学習メモとなります。

参考書籍としてLinuxネットワークプログラミングバイブルを用い、同書の内容に沿ったかたちで、Pythonに書き直しをしていきます。

今回は、サーバソケットのマルチクライアント化(多重化)の方法として、fork()を利用した方法について学んでいきます。

 

 

ネットワークプログラミングについて

ネットワークプログラミングの目的はデータの送受信をすることで、そのためにソケットというインターフェースを利用します。

このソケットは、TCP/IPの誕生時にBSD Uinux上に実装されたものですが、非常に使い勝手が良かったため、WindowsなどのUnix系以外のOSでも利用されています。

Pythonで実装する際の手順やオプションの設定なども、概ねそのままプログラミングすることができます。

 

多重化とは

通常のサーバソケットでは、accept()(もしくはrecv())を呼び出した後、それ以降の処理をブロックしてしまいます(ブロッキングIO)。

そのため、他のコネクションが確立したソケットは、その処理が完了するまで待機させられ、クライアントごとの同時接続ができない使い勝手の悪いアプリケーションになってしまいます。

これらの問題を解決するためには、多重化と呼ばれる技術を使い、マルチクライアント化に対応する必要があります。

 

fork()によるマルチクライアント化

前回はepoll()を用いた多重化について学びました。

 

 

前回までで学んだselect(), poll(), epoll()などによる多重化では、一つのサーバプロセス内でマルチクライアント化を実現していました。

今回のfork()では、クライアントから接続要求があった際にサーバプロセス(親プロセス)がクライアントごとに新たなプロセス(子プロセス)を生成して、マルチクライアント化に対応します。

Pythonではmultiprocessingモジュールを使うことで、容易にマルチプロセスを利用したプログラムを作成することが可能となっておりますが、Unix環境で実行する際は内部でos.fork()が利用されています。

 

 

マルチプロセスでは、新たに別のプロセスを生成し、それぞれがOSから与えられた別のメモリ空間上でプログラムを動かすため、グローバル変数などで互いに参照することはできません。

しかし、それぞれが独立したプロセスのため、子プロセスが異常終了したからといって親プロセスや他の子プロセスまで異常終了するようなことはありません。

 

サーバプログラムの作成 

それでは、fork()を利用したマルチプロセスによるサーバプログラムを作成します。

以下がソースコードになります。

 

# server_multi_fork.py
import socket
import sys
import signal
import os
import errno

def server_socket(portnum):
  try:
    for res in socket.getaddrinfo(None, portnum, socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_PASSIVE):
      af, socktype, proto, canonname, sa = res
      break
  except socket.gaieor as e:
    print("getaddrinfo():{}".format(e))
    sys.exit(1)

  try:
    nbuf, sbuf = socket.getnameinfo(sa, socket.AI_PASSIVE)
  except socket.gaieor as e:
    print("getnameinfo():{}".format(e))
    sys.exit(1)

  print("port={}".format(sbuf))

  try:
    soc = socket.socket(af, socktype, proto)
  except OSError as e:
    print("socket:{}".format(e))
    sys.exit(1)

  try:
    soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  except OSError as e:
    print("setsockopt:{}".format(e))
    sys.exit(1)

  try:
    soc.bind(sa)
  except OSError as e:
    print("bind:{}".format(e))
    soc.close()
    sys.exit(1)

  try:
    soc.listen(socket.SOMAXCONN)
  except OSError as e:
    print("listen:{}".format(e))
    soc.close()
    sys.exit(1)

  return soc

def accept_loop(soc):
  while True:
    try:
      acc, addr = soc.accept()
      hbuf, sbuf = socket.getnameinfo(addr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
      print("accept:{}:{}".format(hbuf, sbuf))
      try:
        pid = os.fork()
      except OSError as e:
        print("fork():{}".format(e))
        sys.exit(1)
      if pid == 0:
        send_recv(acc, hbuf, sbuf)
        acc.close()
        sys.exit(0)
      elif pid > 0:
        pass
      
      try:
        pid, status = os.waitpid(-1, os.WNOHANG)
      except ChildProcessError as e:
        continue
      if pid > 0:
        print("accept=loop:waitpid:pid={},status={}".format(pid, status))
        print('WIFEXITED:{},WEXITSTATUS:{},WIFSIGNALED:{}\
        WTERMSIG:{},WIFSTOPPED:{},WSTOPSIG:{}'.format(
        os.WIFEXITED(status),
        os.WEXITSTATUS(status),
        os.WIFSIGNALED(status),
        os.WTERMSIG(status),
        os.WIFSTOPPED(status),
        os.WSTOPSIG(status)))
    except InterruptedError as e:
      if e.errno != errno.EINTR:
        print("accept:{}".format(e))
    
def send_recv(acc, host, port):
    buf_size = 512
    while True:
        try:
            data = acc.recv(buf_size)
        except InterruptedError as e:
            print("recv:{}".format(e))
            break 

        if (len(data) == 0):
            # EOF
            print("[{}]recv:EOF".format(os.getpid()))
            break

        data = data.rstrip()
        try:
            print("[{}]{}".format(os.getpid(), data.decode('utf-8')))
        except UnicodeDecodeError:
            pass
        try:
            acc.send(data + b':OK\r\n')
        except InterruptedError as e:
            print("send:{}".format(e))
            break
    acc.close()

def sig_child_handler(sig, frame):
  pid, status = os.wait()
  print('signum={} sig_chld_handler:wait:pid={}, status={}'.format(sig,pid, status))
  print('WIFEXITED:{},WEXITSTATUS:{},WIFSIGNALED:{}\
        WTERMSIG:{},WIFSTOPPED:{},WSTOPSIG:{}'.format(
        os.WIFEXITED(status),
        os.WEXITSTATUS(status),
        os.WIFSIGNALED(status),
        os.WTERMSIG(status),
        os.WIFSTOPPED(status),
        os.WSTOPSIG(status)))

if __name__ == '__main__':
  if (len(sys.argv) != 2):
    print("Usage: {} <server port>.format(sys.argv[0]))
    sys.exit(1)
  signal.signal(signal.SIGCHLD, sig_child_handler)
  soc = server_socket(sys.argv[1])

  print("ready for accept")

  try:
    accept_loop(soc)
    soc.close()
  except KeyboardInterrupt:
    soc.close()
    sys.exit(1)

 

動作確認

それでは、上記プログラムを実行してみます。

 

 > python3 server_multi_fork.py 8888
 port=8888
 ready for accept
 accept:127.0.0.1:55546    # クライアント1接続
 accept:127.0.0.1:55547    # クライアント2接続
 [2481]ok
 [2576]ok
 [2481]recv:EOF            # クライアント1切断
 signum=17 sig_chld_handler:wait:pid=2481, status=0
 WIFEXITED:True,WEXITSTATUS:0,WIFSIGNALED:False WTERMSIG:0,WIFSTOPPED:False,WSTOPSIG:0
 [2576]recv:EOF            # クライアント2切断
 signum=17 sig_chld_handler:wait:pid=2576, status=0
 WIFEXITED:True,WEXITSTATUS:0,WIFSIGNALED:False WTERMSIG:0,WIFSTOPPED:False,WSTOPSIG:0
 
 # プロセスの状態
 > ps -ef | grep server_multi
 vagrant   2479  1854  0 06:36 pts/0    00:00:00 python3 server_multi_fork.py 8888
 vagrant   2481  2479  0 06:36 pts/0    00:00:00 python3 server_multi_fork.py 8888
 vagrant   2576  2479  0 06:37 pts/0    00:00:00 python3 server_multi_fork.py 8888

 

ちゃんと子プロセスが生成されていることが確認できました。

 

最後に

今回はサーバのマルチクライアント化として、fork()を利用した方法について学びました。

次回はthreadingモジュール(pthread)を利用したマルチスレッドによるマルチクライアント化について学んでいきます。

 

参考書籍

Linuxネットワークプログラミングバイブル

TCP/IPソケットプログラミング C言語編