io

在Unix/Linux中的所有东西都是一个文件,每个进程都有一个文件描述符表,这些描述符指向文件、Socket、devices和其他操作系统对象。操作系统一般都是与多个IO源一起工作的,这些系统一般先经过一个初始化阶段,然后进入某种待机模式–等待任何客户端发送请求然后进行响应。

同时处理多个IO源很常见,比如:

  • 同时处理交互式输入和一个网络Socket
  • 同时处理多个Socket
  • TCP服务器同时处理一个监听Socket及其连接的Socket
  • 服务器同时处理TCP和UDP
  • 服务器处理多个服务,可能还有多个协议

那么问题来了,操作系统怎么应对这个场景呢?比较简单的解决方案是为每个client创建一个线程(或进程),然后阻塞在read上直到收到请求。client少的话没问题,但是如果扩展到上百个client就不太合适了(这里的client不一定是网络客户端,用户输入也算)。

怎么使用一个进程/线程来处理呢?在Unix中有五种I/O模式:

  • blocking I/O
  • nonblocking I/O
  • I/O multiplexing (select, poll and epoll)
  • signal driven I/O (SIGIO)
  • asynchronous I/O (the POSIX aio_ functions)

这篇文章主要集中在I/O multiplexing(IO多路复用),在介绍I/O多路复用之前,我们先用blocking I/O来解释一下为什么需要多路复用。

blocking I/O

阻塞式I/O模型可以说是最普遍的I/O模型了,默认情况下,所有的Socket都是阻塞的,如下所示(图中过程是基于UDP的,TCP更复杂一下,需要一些额外变量)

image-20210228121221097

进程调用recvfrom,系统调用不会返回,直到数据报到达并复制到我们的应用程序缓冲区,或者出现错误,最常见的错误是系统调用被信号打断。

试想一下,如果网络客户端同时处理两个输入:标准输入和TCP Socket,客户端在调用fgets(在标准输入上)时被阻塞。此时若服务器进程被杀死,服务器TCP会发送一个FIN给客户端TCP,但是由于客户端进程被阻塞在从标准输入读取数据上,可能很晚才会读取到服务器发的数据。

我们希望在一个或多个I/O条件准备好时得到通知(即输入准备好被读取,或者描述符能够接受更多的输出),这种能力被称为I/O多路复用。

I/O multiplexing

I/O多路复用的原理就是使用内核机制对一组文件描述符进行轮询。在Linux中,有三种方案可以使用:

  • select(2)
  • poll(2)
  • epoll*

这三种方案思路都是一样的:

创建一组文件描述符,告诉内核你想对每个文件描述符做什么(读,写,…),然后用一个线程阻塞一个函数调用,直到至少有一个文件描述符请求的操作可用。但是细节上还是有一些差别,我们逐个来看。

select

selec()系统调用长这个样子

1
2
3
4
5
6
7
#include <sys/select.h>
#include <sys/time.h>

int select(int maxfdp1, fd_set *readset, fd_set *writeset, fd_set *exceptset,
           const struct timeval *timeout);

/* Returns: positive count of ready descriptors, 0 on timeout, –1 on error */

调用select()之后就会阻塞,直到给定的文件描述符准备好执行I/O,或者超时。

被监控的文件描述符分为三个集合(对应select的第2, 3, 4三个参数):

  • readfds:查看集合中文件描述符是否有数据可以读取
  • writefds: 查看集合中文件描述符是否可以在没有阻塞的情况下完成写操作
  • exceptfds:查看集合中文件描述符是否发生了异常,或者是否有带外数据 可用(这些状态只适用于套接字)

一个给定的集合可能是NULL,在这种情况下,select( )不监控该事件。

当成功返回时,每个集合都会被修改成只包含准备好进行I/O操作的文件描述符。由于select会修改文件描述符集合中的内容,只返回那些满足条件的描述符,所以我们每次调用select的时候都需要重新生成一遍完整的集合,把那些被select移除的文件描述符添加回来。

fd_set

这里有必要多说一点关于文件描述符集合 fd_set的东西,fd_set是用一个integer数组实现的,每个bit都对应一个描述符(Windows不是这样):

image-20210228124822891

上图的例子中,传给select的fdset allset中包括3,4和6三个文件描述符,返回的rset中只有3这个bit是1,这就是说只有3准备好了。而select会检查allset中的的bit,这就需要我们告诉select最大的bit是什么,防止select做很多无用功,这就是select的第一个参数maxfdp1的作用。从maxfdp1这个名字也能看出来,select实际需要的是maxfd +1

假设我们的文件描述符是 [1, 3, 5, 800],我们给select传的第一个参数就应该是801,虽然只有4个文件描述符,但select还是要检查0到800的每一位。对于这个问题,有一个替代select的方法 – pselect,它可以在等待时添加一个信号掩码。

example

看一个select实际使用的例子

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
#include <stdio.h>
#include <stdlib.h>
#include <sys/select.h>

int main(void)
{
    fd_set rfds;
    struct timeval tv;
    int retval;

    /* 检查stdin(fd 0),看它何时有输入 */
    FD_ZERO(&rfds);
    FD_SET(0, &rfds);

    /* timeout设置成5s */
    tv.tv_sec = 5;
    tv.tv_usec = 0;

    retval = select(1, &rfds, NULL, NULL, &tv);

    if (retval == -1)
        perror("select()");
    else if (retval)
        printf("Data is available now.\n");
        /* 此时 FD_ISSET(0, &rfds) 的结果true. */
    else
        printf("No data within five seconds.\n");

    exit(EXIT_SUCCESS);
}

一般我们会用select来对多个文件描述符进行选择,这需要在select返回之后用FD_ISSET(fds[i], &rfds)逐个检查结果。

总结一些select的问题

  • 需要在每次调用前逐个建立fd_set,因为select会修改set里的内容
  • select逐个检查0maxfdp1的每个bit,这是O(n)的操作
  • select返回之后,需要对文件描述符进行遍历,逐个检查它是否存在于返回的set中

要说好处的话,可能就是select比较广泛,基本所有的Unix系统都支持。

poll

pollselect很像,但是它引入了一个pollfd struct,改进select里的文件描述符,不需要每次调用都创建一遍fd_set。

1
2
3
4
5
#include <poll.h>

int poll (struct pollfd *fdarray, unsigned long nfds, int timeout);

/* Returns: count of ready descriptors, 0 on timeout, –1 on error */

pollfd的定义如下

1
2
3
4
5
struct pollfd {
  int     fd;       /* descriptor to check */
  short   events;   /* events of interest on fd */
  short   revents;  /* events that occurred on fd */
};

select将fd绑定在事件上,为每种事件创建一个fd集合。poll将它反了过来,将事件绑定在fd上,对于每个fd,建立一个类型为pollfd的对象,并填入所需的事件,在poll返回后,检查revents字段就可以了。

example

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
#include <poll.h>
#include <fcntl.h>
#include <sys/types.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>

#define errExit(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0)

int main(int argc, char *argv[])
{
    int nfds, num_open_fds;
    struct pollfd *pfds;

    if (argc < 2) {
      fprintf(stderr, "Usage: %s file...\n", argv[0]);
      exit(EXIT_FAILURE);
    }

    num_open_fds = nfds = argc - 1;
    pfds = calloc(nfds, sizeof(struct pollfd));
    if (pfds == NULL)
        errExit("malloc");

    /* 在命令行中打开每个文件,并将其添加到'pfds'数组中。 */

    for (int j = 0; j < nfds; j++) {
        pfds[j].fd = open(argv[j + 1], O_RDONLY);
        if (pfds[j].fd == -1)
            errExit("open");

        printf("Opened \"%s\" on fd %d\n", argv[j + 1], pfds[j].fd);

        pfds[j].events = POLLIN;
    }

    /* 只要至少有一个文件描述符被打开,就继续调用poll() */ 
    while (num_open_fds > 0) {
        int ready;

        printf("About to poll()\n");
        ready = poll(pfds, nfds, -1);
        if (ready == -1)
            errExit("poll");

        printf("Ready: %d\n", ready);

        /* 处理 poll() 返回的文件描述符中的 revents */

        for (int j = 0; j < nfds; j++) {
            char buf[10];

            if (pfds[j].revents != 0) {
                printf("  fd=%d; events: %s%s%s\n", pfds[j].fd,
                        (pfds[j].revents & POLLIN)  ? "POLLIN "  : "",
                        (pfds[j].revents & POLLHUP) ? "POLLHUP " : "",
                        (pfds[j].revents & POLLERR) ? "POLLERR " : "");

                if (pfds[j].revents & POLLIN) {
                    ssize_t s = read(pfds[j].fd, buf, sizeof(buf));
                    if (s == -1)
                        errExit("read");
                    printf("    read %zd bytes: %.*s\n",
                            s, (int) s, buf);
                } else {                /* POLLERR | POLLHUP */
                    printf("    closing fd %d\n", pfds[j].fd);
                    if (close(pfds[j].fd) == -1)
                        errExit("close");
                    num_open_fds--;
                }
            }
        }
    }

    printf("All file descriptors closed; bye\n");
    exit(EXIT_SUCCESS);
}

select相同的是,我们需要检查每个pollfd对象,看看它的文件描述符是否准备好了,但我们不需要每次迭代都建立整个集合。

通过上边的描述可以看到,在使用selectpoll时,我们在用户空间管理一切,在每次调用时都会发送一个集合然后等待。如果要添加另一个套接字,我们需要先把它添加到集合上,然后再次调用select/poll,可以用下图来描述:

image-20210228140432674

还有一种不同的方案,那就是epoll*,它在内核空间管理context。

epoll*

epoll*实际上是一组系统调用:epoll_create(2), epoll_create1(2), epoll_ctl(2), epoll_wait(2)。这些系统调用帮助我们在内核中创建和管理context,可以分为三个步骤:

  1. 使用epoll_create在内核中创建一个context
  2. 使用epoll_ctl在context中添加和删除文件描述符
  3. 使用epoll_wait在context中等待事件发生

example

为了简洁,下边的代码中没有考虑错误处理。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
#define MAX_EVENTS 5
#define READ_SIZE 10
#include <stdio.h> 
#include <unistd.h>
#include <sys/epoll.h>
#include <string.h>
 
int main()
{
    int running = 1, event_count, i;
    size_t bytes_read;
    char read_buffer[READ_SIZE + 1];
    struct epoll_event event, events[MAX_EVENTS];

    /* 创建epoll context */
    int epoll_fd = epoll_create1(0);
  
    event.events = EPOLLIN;
    event.data.fd = 0;
  
    /* 添加fd和对应的事件 */
    epoll_ctl(epoll_fd, EPOLL_CTL_ADD, 0, &event);
  
    while(running)
    {
        /* 等待事件 */
        event_count = epoll_wait(epoll_fd, events, MAX_EVENTS, 30000);
        for(i = 0; i < event_count; i++)
        {
            printf("Reading file descriptor '%d' -- ", events[i].data.fd);
            bytes_read = read(events[i].data.fd, read_buffer, READ_SIZE);
            printf("%zd bytes read.\n", bytes_read);
            read_buffer[bytes_read] = '\0';
            printf("Read '%s'\n", read_buffer);
      
            if(!strncmp(read_buffer, "stop\n", 5))
              running = 0;
        }
    }
  
    /* 关闭 epoll context */
    close(epoll_fd);
    return 0;
}

Level-triggered and edge-triggered

epoll对文件描述符的操作有两种模式:LT(level trigger, 水平触发)ET(edge trigger, 边缘触发)

  • LT模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。
  • ET模式:当epoll_wait检测到描述符事件发生并将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。

其中LT是默认模式,而ET可以理解成高速模式,但是它只支持no-block Socket。

设想这样一个场景:

  1. 一个管道的读端的文件描述符 rfd被注册到epoll context中
  2. 管道的写端写入2KB的数据
  3. 调用 epoll_wait(2)之后,返回rfd,因为此时rfd作为读端已经有数据准备好可以读了
  4. 管道的读端读取了1KB的数据
  5. 再次调用epoll_wait(2)

如果rfd在被添加到epoll context中的时候用的是ET模式,在第5步中完成epoll_wait(2)之后可能会挂起,尽管文件输入缓冲区中仍有可用的数据,此时写端甚至可能正在等之前发送数据的响应呢。其原因是ET模式只有当被监控的文件描述符发生变化时才会发送事件。 因此,在步骤5中调用者可能最终会停留在等待一些已经存在于输入缓冲区内的数据。

为了避免在使用EPOLLET模式时的阻塞,可以用下面两个方法:

  1. 使用非阻塞文件描述符
  2. read(2)write(2)返回EAGAIN之后再等待事件

当缓冲区里没有数据可读的时候,此时继续read就会返回EAGAIN错误,这也意味着可以进行下次等待了。

References

http://www.cs.toronto.edu/~krueger/csc209h/f05/lectures/Week11-Select.pdf

https://notes.shichao.io/unp/ch6/

https://devarea.com/linux-io-multiplexing-select-vs-poll-vs-epoll/#.YDeWM5Mzat4

https://man7.org/linux/man-pages/man2/select.2.html

https://man7.org/linux/man-pages/man7/epoll.7.html