网络阻塞与非阻塞和多路io复用详解

    191

主线是什么

首先为什么需要我们的网络多路复用,阻塞和非阻塞,因为我们需要考虑效率,而客户端和服务端一对一通信显然是不会陷入效率问题,而真正会产生效率问题是因为我们的多线程处理并发的场景,因此主线就是多线程中的主线程和子线程

阻塞在阻塞什么

函数分析

我们不妨分析一下所有的 socket 函数,看看什么适合主线程,什么适合子线程。
首先 socket 函数创建的是一个 sock 结构体,交给内核管理
然后是 bind 函数给 sock 绑定 ip 和端口
之后是 listen 函数初始化半连接队列和全连接队列,并且初始化 sock 状态
再然后是 accept 函数从全连接队列中获取 sock 的 fd
Recv 函数通过这个 fd 获得 sock 中缓冲区的内容,并加以处理
Send 函数通过 fd 获得 sock 中缓冲区的位置,并开始写,又由内核发送

哪些阻塞?

  1. socket():这个函数用于创建一个新的 socket,这个过程通常是非阻塞的。

  2. bind():这个函数用于将 socket 绑定到一个特定的 IP 地址和端口号,这个过程也是非阻塞的。

  3. listen():这个函数用于将 socket 设置为监听模式,等待客户端的连接请求,这个过程也是非阻塞的。

  4. accept():这个函数用于接受客户端的连接请求。如果没有客户端请求连接,这个函数会阻塞,直到有客户端连接为止。

  5. recv():这个函数用于从连接的 socket 中接收数据。如果 socket 中没有数据,这个函数会阻塞,直到有数据可读。

  6. send():这个函数用于向连接的 socket 发送数据。如果发送缓冲区已满,这个函数会阻塞,直到有足够的空间可以发送新的数据。

三个阻塞,分别为 accept() recv() send()

主线程应该是什么,子线程应该是什么,为什么

按照上面的解释,我们应该只考虑 accept() recv() send() 中区分从哪里开始是主线程,哪里是子线程,因为如果是非阻塞的话,那么是不需要分主线程还是子线程的,因为如果不阻塞就不存在要创建多个线程来处理,一个线程就可以直接处理完了。或者换一个角度想,如果主线程出现了阻塞很多,那么整个程序就堵住了,所以为了防止主线程被堵住,应该把经常造成阻塞的函数分给子线程,让子线程自个去阻塞,然后不影响整个进程。

accept(): accept 函数从全连接队列中获取 sock 的 fd
这个操作相当于是去取 sock,如果没有客户端请求,那么就不会有 sock。

那么我们来假设一下,如果是放到子线程会怎么样?

那么主线程此时是没有阻塞状态的对吧?子线程和主线程是相互脱离的,很有可能就是主线程已经结束,而子线程没运行完或者是在阻塞,而导致子线程没有处理完,主线程提前结束

那么我们来假设一下,如果是放到主线程会怎么样?

那么就不会出现上述情况,主线程会一直等待请求(while 循环维持),显示就是主线程正在等待数据对吧。

所以我们把 accept 归类到主线程,保证主线程正常等待客户端通信,防止主线程提前结束

由于我们的收发是一个组合操作所以我们一起考虑
recv():这个函数用于从连接的 socket 中接收数据。如果 socket 中没有数据,这个函数会阻塞,直到有数据可读。
send():这个函数用于向连接的 socket 发送数据。如果发送缓冲区已满,这个函数会阻塞,直到有足够的空间可以发送新的数据。

那么我们来假设一下,如果是放到子线程会怎么样?

那么就是从 accept 得到的 fd 都交给子线程处理,相当于一个线程拥有一个 sock,然后我们把剩余的操作全部交给子线程

那么我们来假设一下,如果是放到主线程会怎么样?

那么就是从 accept 得到的 fd 都交给主线程处理,相当于主线程一次只能处理一个请求,失去了并发性。

所以我们知道了,accept 用于持续接收通信所以必须放在主线程,而 recv 和 send 保证并发就必须放到子线程中

非阻塞在非阻塞什么

非阻塞理解

非阻塞的事实上就是对于阻塞 io 的一种改进,虽然阻塞 io 可以通过多线程技术实现一个效果不错的并发,但是也是以消耗大量资源(线程创建销毁)的代价下完成的。

上面说阻塞是 accept() recv() send() 三个函数导致的,那么非阻塞自然就是使的这是三个函数变成非阻塞的状态。

很显然通过之前的讨论,accept 假装必须阻塞,保证客户端的请求得以响应,但是这不就矛盾了?我们如果阻塞了 accept,那么怎么执行下面的接收和发送?我们只有一个主线程。

很简单,我们对于 accept 的返回值进行一个判断,如果小于 0,那么就是没有接收到就去执行 recv 和 send,如果接收到那么就缓存到数组当中。

然后 recv 和 send 从数组中提取 fd,然后根据返回值,有准备好就发和接收,如果没有就跳过。然后重新进入 accept 循环

这样就实现了非阻塞同时并发的效果

所以非阻塞是非阻塞了 accept() recv() send() 三个函数

非阻塞条件

  • 设置非阻塞 sock
    阻塞:
    int lfd=socket(AF_INET,SOCK_STREAM,0);
    非阻塞:
    int lfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);

  • 初始化 accept接收到 sock 为非阻塞
    #include <fcntl.h>
    fcntl(new_socket_fd, F_SETFL, O_NONBLOCK);

非阻塞代码

#include <sys/socket.h>

#include <netinet/in.h>

#include <unistd.h>

#include <fcntl.h>

#include <stdio.h>

#include <errno.h>

  

#define MAX_CLIENTS 1000

  

int main() {

    int sockfd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);

    struct sockaddr_in serv_addr;

    serv_addr.sin_family = AF_INET;

    serv_addr.sin_addr.s_addr = INADDR_ANY;

    serv_addr.sin_port = htons(9999);

    bind(sockfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));

    listen(sockfd, 5);

    int client_sockets[MAX_CLIENTS];

    for (int i=0; i<MAX_CLIENTS; i++)

        client_sockets[i] = 0;

    while (1) {

        while (1) {

            int new_socket = accept(sockfd, NULL, NULL);

            if (new_socket < 0) {

                if (errno == EAGAIN || errno == EWOULDBLOCK)

                    break;

                else

                    perror("accept");

            }

            fcntl(new_socket, F_SETFL, O_NONBLOCK);

            for (int i=0; i<MAX_CLIENTS; i++) {

                if (client_sockets[i] == 0) {

                    client_sockets[i] = new_socket;

                    break;

                }

            }

        }

        for (int i=0; i<MAX_CLIENTS; i++) {

            if (client_sockets[i] > 0) {

                char buffer[1024];

                int valread = recv(client_sockets[i], buffer, sizeof(buffer), 0);

                if (valread < 0) {

                    if (errno != EAGAIN && errno != EWOULDBLOCK)

                        perror("recv");

                } else if (valread == 0) {

                    close(client_sockets[i]);

                    client_sockets[i] = 0;

                } else {

                    send(client_sockets[i], buffer, valread, 0);

                }

            }

        }

    }

    return 0;

}

多路 io 复用,多路在哪里

我们知道目前的多路 io 复用基本上是 epoll ,但是我们还是需要去从 select 开始讲讲从根源触发到底是怎么回事

多路复用复用在那里?我们不妨看看非阻塞 nio 的实现,其中有一个循环去接收 accept 存到数组当中,这个过程会不断检查是否有准备好的数据,并且 recv 和 send 也会不断去检测是否有准备好的缓冲区。

Select 是什么,select 是将这种轮询给放到内核中,更具体的说,就是把对缓冲区的访问轮询给放到内核中,让内核自己去判断哪一个 fd 对应的读缓冲区或者写缓冲区出现可用的时候,返回 fd_set 类型的链表会标志为 1。

也就是说 select 是把我们非阻塞状态中对缓冲区的检测放到了内核,减少了从用户区到内核区的开销,这个开销也提一下,开销有:

  1. 上下文切换:从用户态转到内核态时,操作系统需要保存用户态的状态,并加载内核态的状态,这一过程称为上下文切换。上下文包括程序计数器、寄存器、内存权限等信息。

  2. 安全性检查:在执行系统调用之前,操作系统会进行必要的安全性检查,以确保调用是合法的。这包括检查调用参数的有效性、权限验证等,这些检查本身也是一种开销。

  3. 系统调用的处理:系统调用的处理通常比普通的函数调用更为复杂和耗时。系统需要在内核中查找相应的服务例程,并执行之,这个过程中可能还会涉及到更多的安全和兼容性检查。

  4. 中断处理:在内核态,操作系统可能还需要处理各种中断和异常,这也会增加额外的开销。

    如何使用 select

    那么从 nio 到 select 变迁说完了,说说怎么使用。

  • Select 的函数

    • 有五个参数:最大的 fd 值读set写set异常set超时结构体

  • Isset 函数

    • 传入 fdfdset 判断是否标志位为 1,如果是则准备好了,可以发送或者接收

  • Select 的主线

    • 主线是通过对 fd 的遍历,用 isset 函数判断已经准备好缓冲区的 sock,随之调用或不调用 recv,send 函数

  • Fd_Set

    • 需要先在 listen 后,通过 fd_zero () 初始化 set,可以是读可以是写可以是异常, 用 fd_set () 设置需要检查的内容,对服务器 sock 的 fd 设置标志位为 1

具体实现如下:

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <unistd.h>

#include <fcntl.h>

#include <errno.h>

  

#define PORT 9999

#define BUFFER_SIZE 1024

  

int make_socket_non_blocking(int fd) {

    int flags = fcntl(fd, F_GETFL, 0);

    if (flags == -1) {

        perror("fcntl");

        return -1;

    }

    flags |= O_NONBLOCK;

    if (fcntl(fd, F_SETFL, flags) == -1) {

        perror("fcntl");

        return -1;

    }

    return 0;

}

  

int main() {

    int server_fd, new_socket;

    struct sockaddr_in address;

    int opt = 1;

    int addrlen = sizeof(address);

    char buffer[BUFFER_SIZE] = {0};

  

    // 创建 socket 文件描述符

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {

        perror("socket failed");

        exit(EXIT_FAILURE);

    }

  

    // 设置 socket 为非阻塞

    make_socket_non_blocking(server_fd);

  

    // 强制绑定端口

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {

        perror("setsockopt");

        exit(EXIT_FAILURE);

    }

    address.sin_family = AF_INET;

    address.sin_addr.s_addr = INADDR_ANY;

    address.sin_port = htons(PORT);

  

    // 绑定 socket 到端口

    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address))<0) {

        perror("bind failed");

        exit(EXIT_FAILURE);

    }

    if (listen(server_fd, 3) < 0) {

        perror("listen");

        exit(EXIT_FAILURE);

    }

  

    fd_set read_fds, temp_fds;

    int max_sd = server_fd;

  

    FD_ZERO(&read_fds);

    FD_SET(server_fd, &read_fds);

  

    while (1) {

        temp_fds = read_fds;

        int activity = select(max_sd + 1, &temp_fds, NULL, NULL, NULL);

  

        if ((activity < 0) && (errno != EINTR)) {

            printf("select error");

        }

  

        // 检查是否有新的连接请求

        if (FD_ISSET(server_fd, &temp_fds)) {

            if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))<0) {

                perror("accept");

                exit(EXIT_FAILURE);

            }

  

            // 设置新 socket 为非阻塞

            make_socket_non_blocking(new_socket);

            FD_SET(new_socket, &read_fds);

            if (new_socket > max_sd) {

                max_sd = new_socket;

            }

        }

  

        // 迭代所有的 socket 描述符

        for (int i = 0; i <= max_sd; i++) {

            if (FD_ISSET(i, &temp_fds)) {

                // 检查是否是已经连接的客户端发来的数据

                if (i != server_fd) {

                    int valread = read(i , buffer, BUFFER_SIZE);

                    if (valread == 0) {

                        // 客户端断开了连接

                        close(i);

                           FD_CLR(i, &read_fds); // 从文件描述符集合中移除

                    } else if (valread < 0) {

                        // 发生了读取错误

                        if (errno != EAGAIN) {

                            perror("read");

                            close(i);

                            FD_CLR(i, &read_fds);

                        }

                    } else {

                        // 从客户端收到了数据,可以在这里处理

                        buffer[valread] = '\0';

                        printf("Received: %s\n", buffer);

                        // 回写数据到客户端(Echo服务器)

                        send(i, buffer, valread, 0);

                    }

                }

            }

        }

    }

  

    // 关闭监听的 socket 描述符

    close(server_fd);

    return 0;

}

Poll 的思路

Poll 思路上和 select 是相近的,由于 select 他设置为 1024 个最大连接数量,这个是设定好的,由于考虑到兼容性以及并发的需要,所以出了 poll 这种理论上无限连接的模式。

实现:

#include <stdio.h>

#include <stdlib.h>

#include <string.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <unistd.h>

#include <fcntl.h>

#include <errno.h>

#include <poll.h>

  

#define PORT 9999

#define BUFFER_SIZE 1024

#define MAX_CLIENTS 256

  

int make_socket_non_blocking(int fd) {

    int flags = fcntl(fd, F_GETFL, 0);

    if (flags == -1) {

        perror("fcntl");

        return -1;

    }

    flags |= O_NONBLOCK;

    if (fcntl(fd, F_SETFL, flags) == -1) {

        perror("fcntl");

        return -1;

    }

    return 0;

}

  

int main() {

    int server_fd, new_socket;

    struct sockaddr_in address;

    int opt = 1;

    int addrlen = sizeof(address);

    char buffer[BUFFER_SIZE] = {0};

  
  

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {

        perror("socket failed");

        exit(EXIT_FAILURE);

    }

  

    make_socket_non_blocking(server_fd);

  

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {

        perror("setsockopt");

        exit(EXIT_FAILURE);

    }

    address.sin_family = AF_INET;

    address.sin_addr.s_addr = INADDR_ANY;

    address.sin_port = htons(PORT);

  

    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {

        perror("bind failed");

        exit(EXIT_FAILURE);

    }

    if (listen(server_fd, 3) < 0) {

        perror("listen");

        exit(EXIT_FAILURE);

    }

  

    struct pollfd fds[MAX_CLIENTS];

    int nfds = 1;

    fds[0].fd = server_fd;

    fds[0].events = POLLIN;

  

    while (1) {

        int poll_count = poll(fds, nfds, -1);

  

        if (poll_count < 0) {

            perror("poll");

            exit(EXIT_FAILURE);

        }

  

        for (int i = 0; i < nfds; i++) {

            if (fds[i].revents & POLLIN) {

                if (fds[i].fd == server_fd) {

                    if ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) < 0) {

                        perror("accept");

                        continue;

                    }

                    make_socket_non_blocking(new_socket);

                    fds[nfds].fd = new_socket;

                    fds[nfds].events = POLLIN;

                    nfds++;

                } else {

                    int valread = read(fds[i].fd, buffer, BUFFER_SIZE);

                    if (valread == 0) {

                        close(fds[i].fd);

                        fds[i].fd = -1;

                    } else if (valread < 0) {

                        if (errno != EAGAIN) {

                            perror("read");

                            close(fds[i].fd);

                            fds[i].fd = -1;

                        }

                    } else {

                        buffer[valread] = '\0';

                        printf("Received: %s\n", buffer);

                        send(fds[i].fd, buffer, valread, 0);

                    }

                }

            }

        }

  

        for (int i = 0; i < nfds; i++) {

            if (fds[i].fd == -1) {

                for (int j = i; j < nfds - 1; j++) {

                    fds[j] = fds[j + 1];

                }

                nfds--;

            }

        }

    }

  

    close(server_fd);

    return 0;

}

Epoll 的思路

Epoll 的思路是想要解决掉上述的 select/poll 的轮训,因为 isset 在辅助我们判断缓冲区是否好的前提是内核已经帮我们处理了对每一个缓冲区是否有数据进行一个判断,而这个操作也是十分消耗资源的,特别是连接一多的情况下,那么我们需要在内核轮询的也就更多了。

那么有没有一种不需要轮询的操作呢,那么就是我们直接对我们的每一个 fd 进行一个单一的监听,然后如果这个监听监听到数据就把 fd 放到就绪的空间内,然后我们需要的时候直接从这个区域中取出来就可以了,就不需要去轮询是否有数据。

epoll 通过以下步骤优化了这一过程:

  1. 使用 epoll_create 创建一个 epoll 实例,这个实例将被用来存储和跟踪各个文件描述符的状态。

  2. 通过 epoll_ctl 添加或修改文件描述符的监听事件。这个调用使得内核得以知道哪些文件描述符被监控以及监控哪种类型的事件。

  3. 当调用 epoll_wait 时,此函数会阻塞直到至少有一个监控的文件描述符发生了事件。与 selectpoll 不同的是,epoll_wait 已经不需要遍历所有文件描述符,而是直接返回那些已经触发事件的文件描述符。这样大大减少了不必要的检查和等待时间。

也就是使用 epollcreate 创建一个代理对象,这个相当于是 select 的 select 函数的调用,拥有等待,就绪的队列,用于存放 sock,如果此时执行 epoll_wait 就会把当前的 sock 添加到等待队列当中,如果是网卡接受到数据的时候会向 cpu 发出中断,这个中断执行指令,把对应的 sock 索引从等待队列添加到就绪队列,此时会重新唤起我们的进程,此时 epoll_wait 就会解除阻塞,同时返回我们的就绪队列,我们可以根据就绪队列来获得所有已经就绪的 sock,之后随便我们来 read,或者 send

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <unistd.h>
#include <fcntl.h>
#include <errno.h>
#include <sys/epoll.h>

#define PORT 9999
#define BUFFER_SIZE 1024
#define MAX_EVENTS 256

int make_socket_non_blocking(int fd) {
    int flags = fcntl(fd, F_GETFL, 0);
    if (flags == -1) {
        perror("fcntl");
        return -1;
    }
    flags |= O_NONBLOCK;
    if (fcntl(fd, F_SETFL, flags) == -1) {
        perror("fcntl");
        return -1;
    }
    return 0;
}

int main() {
    int server_fd, new_socket;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);
    char buffer[BUFFER_SIZE] = {0};

    server_fd = socket(AF_INET, SOCK_STREAM, 0);
    if (server_fd == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }

    make_socket_non_blocking(server_fd);

    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }

    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons(PORT);

    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address)) < 0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }

    if (listen(server_fd, 3) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }

    int epoll_fd = epoll_create1(0);
    if (epoll_fd == -1) {
        perror("epoll_create1");
        exit(EXIT_FAILURE);
    }

    struct epoll_event event;
    struct epoll_event events[MAX_EVENTS];

    event.events = EPOLLIN;
    event.data.fd = server_fd;
    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, server_fd, &event) == -1) {
        perror("epoll_ctl: server_fd");
        exit(EXIT_FAILURE);
    }

    while (1) {
        int n = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);
        if (n == -1) {
            perror("epoll_wait");
            exit(EXIT_FAILURE);
        }

        for (int i = 0; i < n; i++) {
            if (events[i].data.fd == server_fd) {
                while ((new_socket = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen)) != -1) {
                    make_socket_non_blocking(new_socket);
                    event.events = EPOLLIN | EPOLLET;
                    event.data.fd = new_socket;
                    if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, new_socket, &event) == -1) {
                        perror("epoll_ctl: add");
                        close(new_socket);
                    }
                }
                if (errno != EAGAIN && errno != EWOULDBLOCK) {
                    perror("accept");
                }
            } else {
                int done = 0;
                while (1) {
                    ssize_t count = read(events[i].data.fd, buffer, BUFFER_SIZE - 1);
                    if (count == -1) {
                        if (errno != EAGAIN) {
                            perror("read");
                            done = 1;
                        }
                        break;
                    } else if (count == 0) {
                        done = 1;
                        break;
                    }
                    buffer[count] = '\0';
                    printf("Received: %s\n", buffer);
                    send(events[i].data.fd, buffer, count, 0);
                }
                if (done) {
                    printf("Closed connection on descriptor %d\n", events[i].data.fd);
                    close(events[i].data.fd);
                }
            }
        }
    }

    close(server_fd);
    return 0;
}

总结

事实上,上面的 io 复用都是基于单线程的情况下处理大量并发,但是为缓解线程压力也可以引入多线程来分担线程压力。

最后希望本文对你有所帮助!

作者:FichteHerbst.
文章地址:
版权声明: 本站所有文章除特别声明外 ,均采用 CC BY-NC-SA 4.0 协议。
非商业转载及引用请注明出处(作者、原文链接),商业转载请联系作者获得授权。

消息盒子

# 暂无消息 #

只显示最新10条未读和已读信息