CppGuide社区 CppGuide社区
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
首页
  • 🔥最新谷歌C++风格指南(含C++17/20)
  • 🔥C++17详解
  • 🔥C++20完全指南
  • 🔥C++23快速入门
🔥C++面试
  • 第1章 C++ 惯用法与Modern C++篇
  • 第2章 C++开发工具与调试进阶
  • 第3章 C++多线程编程从入门到进阶
  • 第4章 C++网络编程重难点解析
  • 第5章 网络通信故障排查常用命令
  • 第6章 网络通信协议设计
  • 第7章 高性能服务结构设计
  • 第8章 Redis网络通信模块源码分析
  • 第9章 服务其他模块设计
  • 🚀 全部章节.pdf 下载 (opens new window)
  • 🔥C++游戏编程入门(零基础学C++)
  • 🔥使用C++17从零开发一个调试器 (opens new window)
  • 🔥使用C++20从零构建一个完整的低延迟交易系统 (opens new window)
  • 🔥使用C++从零写一个C语言编译器 (opens new window)
  • 🔥从零用C语言写一个Redis
  • leveldb源码分析
  • libevent源码分析
  • Memcached源码分析
  • TeamTalk源码分析
  • 优质源码分享 (opens new window)
  • 🔥远程控制软件gh0st源码分析
  • 🔥Windows 10系统编程
  • 🔥Linux 5.x内核开发与调试 完全指南 (opens new window)
  • TCP源码实现超详细注释版.pdf (opens new window)
  • 高效Go并发编程
  • Go性能调优
  • Go项目架构设计
  • 🔥使用Go从零开发一个数据库
  • 🔥使用Go从零开发一个编译器 (opens new window)
  • 🔥使用Go从零开发一个解释器 (opens new window)
Rust编程指南
  • SQL零基础指南
  • MySQL开发与调试指南
GitHub (opens new window)
  • 从零用C语言写一个Redis 引言
  • 02. 套接字入门
  • 03. 简易服务器/客户端
  • 04. 协议解析
  • 05. 事件循环与非阻塞I/O
  • 06. 事件循环的实现
    • 06. 事件循环的实现
      • 练习:
  • 07. 基础服务器:实现get、set、del功能
  • 08. 数据结构:哈希表
  • 09. 数据序列化
  • 10. AVL树:实现与测试
  • 11. AVL树和有序集合
  • 12. 事件循环和定时器
  • 13. 堆数据结构和生存时间(TTL)
  • 14. 线程池与异步任务
目录

06. 事件循环的实现

# 06. 事件循环的实现

这一章带大家瞧瞧一个回显服务器(echo server)的真实C++代码。先看看Conn结构体的定义:

enum {
    STATE_REQ = 0,
    STATE_RES = 1,
    STATE_END = 2,   // 标记这个连接,准备删除它
};

struct Conn {
    int fd = -1;
    uint32_t state = 0;           // 取值为STATE_REQ 或 STATE_RES
    // 读缓冲区
    size_t rbuf_size = 0;
    uint8_t rbuf[4 + k_max_msg];
    // 写缓冲区
    size_t wbuf_size = 0;
    size_t wbuf_sent = 0;
    uint8_t wbuf[4 + k_max_msg];
};
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

我们需要读写缓冲区,毕竟在非阻塞模式下,输入输出(IO)操作经常得往后推一推。

这个state(状态)是用来决定怎么处理连接的。一个进行中的连接有两种状态,STATE_REQ是用来读取请求的,STATE_RES则是用来发送响应的。

下面是事件循环的代码:

int main()  {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if (fd < 0)  {
        die("socket()");
    }

    // 绑定、监听等等操作
    // 代码省略...

    // 一个存放所有客户端连接的映射,用文件描述符(fd)作为键
    std::vector<Conn *> fd2conn;

    // 将监听的文件描述符设置为非阻塞模式
    fd_set_nb(fd);

    // 事件循环
    std::vector<struct pollfd> poll_args;
    while (true)  {
        // 准备poll()的参数
        poll_args.clear();
        // 为了方便,把监听的文件描述符放在第一个位置
        struct pollfd pfd = {fd, POLLIN, 0};
        poll_args.push_back(pfd);
        // 连接的文件描述符
        for (Conn *conn : fd2conn)  {
            if (!conn)  {
                continue;
            }
            struct pollfd pfd = {};
            pfd.fd = conn->fd;
            pfd.events = (conn->state == STATE_REQ)? POLLIN : POLLOUT;
            pfd.events = pfd.events | POLLERR;
            poll_args.push_back(pfd);
        }

        // 轮询(poll)活跃的文件描述符
        // 这里的超时参数没啥用,随便设个大点儿的数就行
        int rv = poll(poll_args.data(), (nfds_t)poll_args.size(), 1000);
        if ( rv < 0)  {
            die("poll");
        }

        // 处理活跃的连接
        for (size_t i = 1; i < poll_args.size(); ++i)  {
            if (poll_args[i].revents)  {
                Conn *conn = fd2conn[poll_args[i].fd];
                connection_io(conn);
                if (conn->state == STATE_END)  {
                    // 客户端正常关闭,或者出了啥问题
                    // 销毁这个连接
                    fd2conn[conn->fd] = NULL;
                    (void)close(conn->fd);
                    free(conn);
                }
            }
        }

        // 如果监听的文件描述符活跃,尝试接受一个新连接
        if (poll_args[0].revents)  {
            (void)accept_new_conn(fd2conn, fd);
        }
    }

    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65

事件循环里,咱们首先要做的就是设置poll的参数。监听的文件描述符用POLLIN标志来轮询。对于连接的文件描述符,Conn结构体的状态决定了轮询标志。在这个例子里,轮询标志要么是读(POLLIN),要么是写(POLLOUT),不会同时出现。要是用epoll的话,事件循环里通常第一件事就是用epoll_ctl更新文件描述符集合。

poll还有个超时参数,这个参数可以用来实现定时器。在咱们这儿,这参数不重要,随便设个大数就行。poll返回之后,我们就能知道哪些文件描述符可以读写啦,然后就可以采取相应行动。

accept_new_conn()函数负责接受新连接,并且创建Conn结构体对象:

static void conn_put(std::vector<Conn *> &fd2conn, struct Conn *conn)  {
    if (fd2conn.size() <= (size_t)conn->fd)  {
        fd2conn.resize(conn->fd + 1);
    }
    fd2conn[conn->fd] = conn;
}

static int32_t accept_new_conn(std::vector<Conn *> &fd2conn, int fd)  {
    // 接受连接
    struct sockaddr_in client_addr = {};
    socklen_t socklen = sizeof(client_addr);
    int connfd = accept(fd, (struct sockaddr *)&client_addr, &socklen);
    if (connfd < 0)  {
        msg("accept() error");
        return -1;   // 出错了
    }

    // 将新连接的文件描述符设置为非阻塞模式
    fd_set_nb(connfd);
    // 创建Conn结构体
    struct Conn *conn = (struct Conn *)malloc(sizeof(struct Conn));
    if (!conn)  {
        close(connfd);
        return -1;
    }
    conn->fd = connfd;
    conn->state = STATE_REQ;
    conn->rbuf_size = 0;
    conn->wbuf_size = 0;
    conn->wbuf_sent = 0;
    conn_put(fd2conn, conn);
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

connection_io()函数是处理客户端连接的状态机:

static void connection_io(Conn *conn)  {
    if (conn->state == STATE_REQ)  {
        state_req(conn);
    } else if (conn->state == STATE_RES)  {
        state_res(conn);
    } else  {
        assert(0);    // 不该出现这种情况
    }
}
1
2
3
4
5
6
7
8
9

STATE_REQ状态是用来读数据的:

static void state_req(Conn *conn)  {
    while (try_fill_buffer(conn))  {}
}

static bool try_fill_buffer(Conn *conn)  {
    // 尝试填充缓冲区
    assert(conn->rbuf_size < sizeof(conn->rbuf));
    ssize_t rv = 0;
    do  {
        size_t cap = sizeof(conn->rbuf) - conn->rbuf_size;
        rv = read(conn->fd, &conn->rbuf[conn->rbuf_size], cap);
    } while ( rv < 0 && errno == EINTR);
    if ( rv < 0 && errno == EAGAIN)  {
        // 遇到EAGAIN,停止读取
        return false;
    }
    if ( rv < 0)  {
        msg("read() error");
        conn->state = STATE_END;
        return false;
    }
    if ( rv == 0)  {
        if (conn->rbuf_size > 0)  {
            msg("unexpected EOF");
        } else  {
            msg("EOF");
        }
        conn->state = STATE_END;
        return false;
    }

    conn->rbuf_size += (size_t)rv;
    assert(conn->rbuf_size <= sizeof(conn->rbuf) - conn->rbuf_size);

    // 尝试逐个处理请求
    // 为啥这里有个循环呢?去看看“流水线(pipelining)”的解释就懂啦
    while (try_one_request(conn))  {}
    return (conn->state == STATE_REQ);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39

这里面门道可不少。要理解这个函数,咱们回顾一下上一章的伪代码:

def do_something_to_client(fd):
    if should_read_from(fd):
        data = read_until_EAGAIN(fd)
        process_incoming_data(data)
        # 代码省略...
1
2
3
4
5

try_fill_buffer()函数用数据填充读缓冲区。因为读缓冲区大小有限,可能还没遇到EAGAIN,缓冲区就满了,所以读完之后得马上处理数据,腾出点空间,然后继续循环调用try_fill_buffer(),直到遇到EAGAIN。

读系统调用(以及其他系统调用)如果遇到errno是EINTR就得重试。EINTR表示系统调用被信号中断了,就算咱们的程序没用信号,也得重试。

try_one_request函数负责处理收到的数据,可为啥这里要用个循环呢?读缓冲区里难道会有不止一个请求?没错!对于请求/响应协议来说,客户端可不局限于一次发一个请求,然后等着响应。客户端可以连着发好几个请求,中间不用等响应,这样能节省点延迟,这种操作模式就叫“流水线(pipelining)”。所以可别以为读缓冲区里最多就一个请求。

下面看看try_one_request函数的代码:

static bool try_one_request(Conn *conn)  {
    // 尝试从缓冲区解析出一个请求
    if (conn->rbuf_size < 4)  {
        // 缓冲区数据不够,下次循环再试试
        return false;
    }
    uint32_t len = 0;
    memcpy(&len, &conn->rbuf[0], 4);
    if (len > k_max_msg)  {
        msg("too long");
        conn->state = STATE_END;
        return false;
    }
    if (4 + len > conn->rbuf_size)  {
        // 缓冲区数据不够,下次循环再试试
        return false;
    }

    // 拿到一个请求,处理一下
    printf("client says: %. *s\n", len, &conn->rbuf[4]);

    // 生成回显响应
    memcpy(&conn->wbuf[0], &len, 4);
    memcpy(&conn->wbuf[4], &conn->rbuf[4], len);
    conn->wbuf_size = 4 + len;

    // 从缓冲区移除这个请求
    // 注意:频繁调用memmove效率可不高
    // 注意:生产环境的代码得优化下这部分
    size_t remain = conn->rbuf_size - 4 - len;
    if (remain)  {
        memmove(conn->rbuf, &conn->rbuf[4 + len], remain);
    }
    conn->rbuf_size = remain;

    // 切换状态
    conn->state = STATE_RES;
    state_res(conn);

    // 如果请求处理完了,就继续外层循环
    return (conn->state == STATE_REQ);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42

try_one_request函数从读缓冲区取出一个请求,生成响应,然后切换到STATE_RES状态。

下面是STATE_RES状态的代码:

static void state_res(Conn *conn)  {
    while (try_flush_buffer(conn))  {}
}

static bool try_flush_buffer(Conn *conn)  {
    ssize_t rv = 0;
    do  {
        size_t remain = conn->wbuf_size - conn->wbuf_sent;
        rv = write(conn->fd, &conn->wbuf[conn->wbuf_sent], remain);
        if ( rv < 0 && errno == EAGAIN)  {
            // 遇到EAGAIN,停止写入
            return false;
        }
        if ( rv < 0)  {
            msg("write() error");
            conn->state = STATE_END;
            return false;
        }
        conn->wbuf_sent += (size_t)rv;
        assert(conn->wbuf_sent <= conn->wbuf_size);
        if (conn->wbuf_sent == conn->wbuf_size)  {
            // 响应全部发送完毕,切换回STATE_REQ状态
            conn->state = STATE_REQ;
            conn->wbuf_sent = 0;
            conn->wbuf_size = 0;
            return false;
        }
        // 写缓冲区还有数据,可以再试试写入
        return true;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

上面这段代码会不断刷新写缓冲区,直到遇到EAGAIN,要是缓冲区数据都写完了,就切换回STATE_REQ状态。

要测试咱们的服务器,可以运行第4章的客户端,因为协议是一样的。也可以改改客户端代码,来演示下流水线客户端:

// 之前的query函数被拆分成了send_req和read_res
static int32_t send_req(int fd, const char *text);
static int32_t read_res(int fd);

int main()  {
    int fd = socket(AF_INET, SOCK_STREAM, 0);
    if (fd < 0)  {
        die("socket()");
    }

    // 代码省略...

    // 多个流水线请求
    const char *query_list[3] = {"hello1", "hello2", "hello3"};
    for (size_t i = 0; i < 3; ++i)  {
        int32_t err = send_req(fd, query_list[i]);
        if (err)  {
            goto L_DONE;
        }
    }
    for (size_t i = 0; i < 3; ++i)  {
        int32_t err = read_res(fd);
        if (err)  {
            goto L_DONE;
        }
    }

L_DONE:
    close(fd);
    return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31

# 练习:

  1. 试试在事件循环里用epoll替换poll,这应该挺简单的。
  2. 我们用memmove来回收读缓冲区空间,不过每次处理请求都调用memmove没必要,改改代码,让它只在读取之前调用memmove。
  3. 在state_res函数里,一次只写一个响应。在流水线场景下,可以把多个响应先缓存起来,最后用一次write调用把它们都发出去。注意写缓冲区可能中途就满了。
  • 06_client.cpp
  • 06_server.cpp
上次更新: 2025/03/25, 00:48:42
05. 事件循环与非阻塞I/O
07. 基础服务器:实现get、set、del功能

← 05. 事件循环与非阻塞I/O 07. 基础服务器:实现get、set、del功能→

最近更新
01
C++语言面试问题集锦 目录与说明
03-27
02
第四章 Lambda函数
03-27
03
第二章 关键字static及其不同用法
03-27
更多文章>
Copyright © 2024-2025 沪ICP备2023015129号 张小方 版权所有
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式