Engineering Note

プログラミングなどの技術的なメモ

サーバソケットの多重化(epoll())(Pythonによるネットワークプログラミング)

client

本記事は、Pythonによるネットワークプログラミングについての学習メモとなります。

参考書籍としてLinuxネットワークプログラミングバイブルを用い、同書の内容に沿ったかたちで、Pythonに書き直しをしていきます。

今回は、サーバソケットのマルチクライアント化(多重化)の方法として、epoll()を利用した方法について学んでいきます。

 

 

ネットワークプログラミングについて

ネットワークプログラミングの目的はデータの送受信をすることで、そのためにソケットというインターフェースを利用します。

このソケットは、TCP/IPの誕生時にBSD Uinux上に実装されたものですが、非常に使い勝手が良かったため、WindowsなどのUnix系以外のOSでも利用されています。

Pythonで実装する際の手順やオプションの設定なども、概ねそのままプログラミングすることができます。

 

多重化とは

通常のサーバソケットでは、accept()(もしくはrecv())を呼び出した後、それ以降の処理をブロックしてしまいます(ブロッキングIO)。

そのため、他のコネクションが確立したソケットは、その処理が完了するまで待機させられ、クライアントごとの同時接続ができない使い勝手の悪いアプリケーションになってしまいます。

これらの問題を解決するためには、多重化と呼ばれる技術を使い、マルチクライアント化に対応する必要があります。

 

epoll()によるマルチクライアント化

前回はpoll()を用いた多重化について学びました。

 

 

接続するクライアントが少ない場合は、前回で学んだselect()やpoll()で行ったほうが処理速度としても問題はありませんが、クライアントが多くなった場合にループ処理の箇所での負荷がかかります。

それに比べてepoll()では、接続準備ができたディスクリプタのみ通知され、それに対してのみループ処理を実行するため、その分パフォーマンスが向上します。

 

監視するIOイベント情報の詳細は以下になります。

定数

意味

EPOLLIN

読み込み可能

EPOLLOUT

書き込み可能

EPOLLPRI

緊急の読み出しデータ

EPOLLERR

設定された fd にエラー状態が発生した

EPOLLHUP

設定された fd がハングアップした

EPOLLET

エッジトリガ動作に設定する。デフォルトではレベルトリガ動作

EPOLLONESHOT

1ショット動作に設定する。1回イベントが取り出されたら、その fd が内部で無効になる

EPOLLEXCLUSIVE

関連づけられた fd にイベントがある場合、1 つの epoll オブジェクトのみを起こします。デフォルトでは (このフラグが設定されていない場合には)、fd に対してポーリングするすべての epoll オブジェクトを起こします。

EPOLLRDHUP

ストリームソケットの他端が接続を切断したか、接続の書き込み側のシャットダウンを行った。

EPOLLRDNORM

EPOLLIN と同じ

EPOLLRDBAND

優先データバンドを読み込める。

EPOLLWRNORM

EPOLLOUT と同じ

EPOLLWRBAND

優先データに書き込みできる。

EPOLLMSG

無視される。


                   https://docs.python.org/ja/3/library/select.htmlより抜粋

 

なお、タイムアウト値は秒単位で指定をします。

 

サーバプログラムの作成

今回は前回作成したサーバプログラムにepoll()によるマルチクライアント機能を実装していきます。

 

 

以下がソースコードになります。

 

# server_multi_epoll.py
import socket
import sys
import fcntl
import os
import errno
import select

MAX_CHILD = 20

def server_socket(portnum):
  try:
    for res in socket.getaddrinfo(None, portnum, socket.AF_INET, socket.SOCK_STREAM, socket.IPPROTO_TCP, socket.AI_PASSIVE):
      af, socktype, proto, canonname, sa = res
      break
  except socket.gaieor as e:
    print("getaddrinfo():{}".format(e))
    sys.exit(1)

  try:
    nbuf, sbuf = socket.getnameinfo(sa, socket.AI_PASSIVE)
  except socket.gaieor as e:
    print("getnameinfo():{}".format(e))
    sys.exit(1)

  print("port={}".format(sbuf))

  try:
    soc = socket.socket(af, socktype, proto)
  except OSError as e:
    print("socket:{}".format(e))
    sys.exit(1)

  try:
    soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  except OSError as e:
    print("setsockopt:{}".format(e))
    sys.exit(1)

  try:
    soc.bind(sa)
  except OSError as e:
    print("bind:{}".format(e))
    soc.close()
    sys.exit(1)

  try:
    soc.listen(socket.SOMAXCONN)
  except OSError as e:
    print("listen:{}".format(e))
    soc.close()
    sys.exit(1)

  return soc

def accept_loop(soc):
  timeout = 10
  ev = select.epoll()
  ev.register(soc, select.EPOLLIN)
  count = 0
  child = []
  print("<<child count:{}>>".format(count))  
  while True:
    try:
      try:
        r = ev.poll(timeout)
      except OSError as e:
        print("poll:{}".format(e))
        break    
      if not r:
          print("poll:timeout")
      else:
        for fd, event in r:
          if fd == soc.fileno() and event & select.EPOLLIN:
            acc, addr = soc.accept()
            hbuf, sbuf = socket.getnameinfo(addr, socket.NI_NUMERICHOST | socket.NI_NUMERICSERV)
            print("accept:{}:{}".format(hbuf, sbuf))
            if len(child) > MAX_CHILD:
              print("child is full: cannot accept")
              acc.close()
            else:
              ev.register(acc)
              child.append(acc)
              count += 1
              print("<<child count:{}>>".format(count))
          
          elif event & (select.EPOLLIN | select.EPOLLERR):
            for i, c in enumerate(child):
              if c.fileno() == fd:
                if send_recv(c, i+1) == -1:
                  child.remove(c)
                  ev.unregister(c)
                  count -= 1
                print("<<child count:{}>>".format(count))
                
    except InterruptedError as e:
      if e.errno != errno.EINTR:
        print("accept:{}".format(e))

def send_recv(acc, child_no):
  buf_size = 512
  try:
    data = acc.recv(buf_size)
  except InterruptedError as e:
    print("recv:{}".format(e))
    return -1

  if (len(data) == 0):
    # EOF
    print("[child{}]recv:EOF".format(child_no))
    return -1

  data = data.rstrip()
  try:
    print("[child{}]{}".format(child_no, data.decode('utf-8')))
  except UnicodeDecodeError:
    pass
  try:
    acc.send(data + b':OK\r\n')
  except InterruptedError as e:
    print("send:{}".format(e))
    return -1
  return 0

if __name__ == '__main__':
  if (len(sys.argv) != 2):
    print("Usage: {} <server port>".format(sys.argv[0]))
    sys.exit(1)

  soc = server_socket(sys.argv[1])

  print("ready for accept")

  try:
    accept_loop(soc)
    soc.close()
  except KeyboardInterrupt:
    soc.close()
    sys.exit(1)

 

動作確認

それでは、上記プログラムを実行してみます。

 

 > python3 server_multi_epoll.py 8888
 port=8888
 ready for accept
 <<child count:0>>
 accept:127.0.0.1:52819        # クライアント1接続
 <<child count:1>>
 [child1]ok
 <<child count:1>>
 accept:127.0.0.1:52820        # クライアント2接続
 <<child count:2>>
 [child2]ok
 <<child count:2>>
 [child1]recv:EOF              # クライアント1切断
 <<child count:1>>
 [child1]recv:EOF              # クライアント2切断
 <<child count:0>>

 

最後に

今回はサーバのマルチクライアント化として、poll()を利用した方法について学びました。

次回はfork()を利用したマルチプロセスによるマルチクライアント化について学んでいきます。

 

参考書籍

Linuxネットワークプログラミングバイブル

TCP/IPソケットプログラミング C言語編