1. libevent库是什么 1.1 定义 libevent 是一个高性能的事件通知库,提供了一种机制,在文件描述符上发生特定事件或超时后执行回调函数。它抽象了底层的事件通知机制(如 epoll、kqueue 或 select),并在不同平台上提供了一致的接口。常用于网络服务器和其他需要高效事件驱动编程的应用场景。同类型的异步事件库还有 libev,以及 nodejs 中使用的 libuv 等。
1.2 前置技能 阻塞/非阻塞IO和多路复用 正如定义中所说,要理解 libevent 的工作原理,首先需要理解一下基于文件描述符的阻塞/非阻塞IO、epoll/kqueue/select 等多路复用的原理。
可以参考:
reactor模式 在前面一小节提到的文章中,可以看到基于 epoll 等方案实现多路复用的代码中,业务代码需要做非常多的事情,如事件注册、回调函数&数据管理、循环等待等。reactor模式可以通过一定的封装,为业务提供一个基础库,让业务代码的异步调用变得更简单,很多常见的开源软件很多都采用了这个设计思路,如 redis、nginx、netty 等等。
reactor,翻译过来就是“反应器”,还挺直观的 —— 对某事件做出反应,执行对应的操作。
我觉得讲的比较清楚的一篇文章:高性能网络模式:Reactor 和 Proactor (本小节的图都来自这篇文章)
单reactor线程:
可以通过单 reactor 线程的示意图来简单理解 reactor 模式的工作原理:通过 多路复用库(如select)的事件等待机制,将不同的事件(可读、可写),分发(dispatch)给不同的处理器(acceptor / handler)进行处理(accept、read、send)。
单Reactor多线程:
如果要避免单线程中业务处理逻辑阻塞 eventloop,可以使用单 reactor 多线程模式,即一个 reactor 线程用来处理事件分发,具体的业务逻辑由 worker 线程池来完成。
主从reactor多线程:
更进一步,还可以把 server socket 的可读事件(即建立连接),和 client socket 的读写事件分为多个 reactor 线程来处理。主 reactor 收到 client fd 后,即分发给子 reactor,由子 reactor 来管理 client fd 的可读、可写事件,同时把具体的业务逻辑交给 worker 线程池来完成。
Proactor模式
和 reacotr 模式差不多,区别是 reactor 是基于同步IO,内核在数据可读写时通知用户,由用户完成数据的读写,而异步IO是内核完成读写后通知用户。
2. libevent 1.1b 的使用 为了理解 libevent 的基本原理,我选择了它最早的一个子版本 tag 来分析,此版本中已经包含的核心的 reactor 逻辑,没有后续版本的优化,用来看源码非常合适。
https://github.com/libevent/libevent/tree/release-1.1b
2.1 epoll不同封装程度实现异步事件 前面 “前置技能” 的描述还是有点抽象,我们可以看一下,基于 epoll 做不同程度的封装来实现异步事件,是什么样的编码体验。下面的例子中,我们需要监听一个 fd 上的可读可写事件,事件触发后执行相关操作。
直接使用 epoll 直接使用 epoll,我们需要自己管理 epoll fd、自己管理事件循环、自己管理回调参数。下面是一部分伪代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int efd = epoll_create(10 );epoll_event ev{ EPOLLIN|EPOLLOUT, {.fd=sfd} }; epoll_ctl(efd, EPOLL_CTL_ADD, target_fd, &ev); epoll_event evs[EPOLL_MAX_EVENTS]; while (1 ){ ev_num = epoll_wait(efd, evs, EPOLL_MAX_EVENTS, -1 ); while (i < ev_num) { epoll_event *c_ev = &evs[i]; } }
自行封装简单的 reactor 模式 epoll_event 结构体中,有个 epoll_data 结构体,可以存储一个指针/数值信息
1 2 3 4 5 6 7 typedef union epoll_data{ void *ptr; int fd; uint32_t u32; uint64_t u64; } epoll_data_t ;
可以基于这个参数简单封装,将读写等事件触发后的回调函数(或者如回调参数等更多信息)保存在这个数据中。
1 2 3 4 5 6 7 8 9 10 11 12 int efd = epoll_create(10 );epoll_event ev{ EPOLLIN|EPOLLOUT, {.fd=sfd} }; bind_reacotr_event(&ev, sfd, handler); while (1 ) { dispatch(efd); }
使用 libevent 1.1b 使用 libevent 来实现可以让代码变得更加简单,libevent 将 epoll 相关操作的逻辑封装在了内部,并做了平台兼容,根据不同的运行平台选择合适的库。
1 2 3 4 5 6 7 8 event_base *base = (event_base*)event_init(); event ev; event_set(&ev, sfd, EV_READ | EV_PERSIST, connect_cb, NULL ); event_add(&ev, NULL ); event_base_dispatch(base);
相对最初的直接使用 epoll 的版本,是不是直接又清晰了?
2.2 编译安装 编译运行环境为 centos7,1.1b是个非常老的版本了,编译安装的过程中,得安装一些依赖工具。
1 yum install -y autoconf automake libtool
然后按下面的步骤操作,即可。
将 configure.in 重命名为 configure.ac
运行 aclocal
生成 aclocal.m4 文件
运行 autoconf
生成 configure 文件
运行 automake --add-missing
,会报错,缺少 Makefile.in 和 config.h.in
运行 autoconf -i
重新生成 Makefile.in 等文件
运行 automake --add-missing
运行 ./configure
,生成 makefile
make,编译完成
make install 安装
安装成功后,可以在 /usr/local/lib
中找到 libevent 库文件,/usr/local/include
中找到 event.h 头文件。
2.3 处理各种事件的示例 研究源码之前,可以先看一下我们可以用 libevent 来做些什么,具体来说就是可以处理哪些类型的事件,然后按图索骥,从源码来分析是如何实现这些事件的处理的。
IO 下面是一个使用 libevent 来处理网络IO的示例,server socket 监听起来后,将其加入 libevent 的监听,在有新的客户端连接建立时,将 client fd 也加入到 libevent 的监听,并处理 client socket 上的读写事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <event.h> #include <sys/socket.h> #include <arpa/inet.h> #include <fcntl.h> #include <strings.h> #include <errno.h> #define handle_error(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0 ) #define SERVER_PORT 8888 #define BUFFER_SIZE 1024 void read_cb (int cfd, short e, void *arg) { char buf[1024 ]; while (true ) { bzero(buf, sizeof (buf)); int ret = recv(cfd, buf, sizeof (buf), 0 ); if (ret == 0 ) { event_del((event*)arg); close(cfd); break ; } if (ret == -1 ){ if (errno == EAGAIN || errno == EWOULDBLOCK) break ; perror("recv client fd data error" ); event_del((event*)arg); close(cfd); free (arg); break ; } printf ("<---: %s \n" , buf); ret = send(cfd, buf, sizeof (buf), 0 ); if (ret !=0 ) perror("send data error" ); printf ("--->: %s \n" , buf); } } void connect_cb (int sfd, short e, void *arg) { while (true ) { sockaddr_in addr; socklen_t addr_len = sizeof (addr); int cfd = accept(sfd, (sockaddr*)&addr, &addr_len); if (cfd == -1 ) { if (errno == EAGAIN || errno == EWOULDBLOCK) break ; perror("accept from server fd error" ); break ; } int flags = fcntl(cfd, F_GETFL, 0 ); fcntl(cfd, F_SETFL, flags | O_NONBLOCK); struct event *ev = (event*)malloc (sizeof (struct event)); event_set(ev, cfd, EV_READ|EV_PERSIST, read_cb, ev); event_add(ev, NULL ); } } int main () { int sfd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 ); sockaddr_in addr; addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_family = AF_INET; addr.sin_port = htons(SERVER_PORT); int ret = bind(sfd, (sockaddr*)&addr, sizeof (addr)); if (ret != 0 ) handle_error("bind fail" ); ret = listen(sfd, 10 ); if (ret != 0 ) handle_error("listen fail" ); event_base *base = (event_base*)event_init(); event ev; event_set(&ev, sfd, EV_READ | EV_PERSIST, connect_cb, NULL ); event_add(&ev, NULL ); event_base_dispatch(base); printf ("exit loop\n" ); return 0 ; }
timerfd 下面是一个使用 libevent 处理 timerfd 事件的例子,将 timerfd 加入 libevent 的持续监听,每隔一段时间触发一次回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <sys/timerfd.h> #include <event.h> void timer_cb (int fd, short event, void *arg) { while (true ) { u_int64_t count; int ret = read(fd, &count, sizeof (count)); if (ret == -1 ){ break ; } printf ("read timeout count: %u\n" , count); } } int main () { event_base *base = (event_base*)event_init(); int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); itimerspec ts = {{2 , 0 }, {6 , 0 }}; timerfd_settime(tfd, 0 , &ts, NULL ); event ev; event_set(&ev, tfd, EV_READ | EV_PERSIST, timer_cb, NULL ); event_add(&ev, NULL ); event_base_dispatch(base); printf ("exit loop\n" ); return 0 ; }
signal 下面是一个使用 libevent 处理进程信号的例子。运行过程中使用 kill 发送监听的信号值,即可触发回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <event.h> #include <signal.h> #include <sys/signalfd.h> #include <stdio.h> #include <event.h> #include <signal.h> void signal_cb (int fd, short event, void *arg) { struct event *sig = (struct event*)arg; printf ("caught an signal %d \n" , fd); } int main (int argc, char **argv) { struct event signal_int; struct timeval tv; struct event_base *base = (event_base*)event_init(); event_set(&signal_int, SIGRTMIN + 10 , EV_SIGNAL|EV_PERSIST, signal_cb, &signal_int); event_add(&signal_int, NULL ); event_base_dispatch(base); return 0 ; }
timeout 监听事件时,可以添加一个超时事件,一旦超时时间到,则会调用一次回调,回调中的 event 设置为 EV_TIMEOUT,并将事件从监听中移除。在前面 timerfd 的示例基础上,加入超时时间:
1 2 timeval timeout = {1 , 0 }; event_add(&ev, &timeout);
回调中可以判断触发的事件类型
1 2 3 4 5 6 void timer_cb (int fd, short event, void *arg) { if (event & EV_TIMEOUT) { printf (" event timeout \n" ); return ; } }
evbuffer 和 bufferevent 管理内存缓冲区 前面处理 IO 的例子中,是自行管理的 client fd 上的读写事件,实际上 libevent 还提供了一个 evbuffer 和 bufferevent 来管理内存缓冲区和 fd 上的读写事件。
把 IO 例子中的 fd 读写操作替换为 evbuffer 和 bufferevent:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <event.h> #include <sys/socket.h> #include <arpa/inet.h> #include <fcntl.h> #include <strings.h> #include <errno.h> #define handle_error(msg) do { perror(msg); exit(EXIT_FAILURE); } while (0 ) #define SERVER_PORT 8888 #define BUFFER_SIZE 1024 void read_cb (bufferevent* bufev, void * carg) { char * buf = (char *)carg; int size = bufferevent_read(bufev, buf, BUFFER_SIZE); if (size > 0 ) { printf ("<---: %s \n" , buf); bufferevent_write(bufev, buf, size); printf ("--->: %s \n" , buf); } } void write_cb (bufferevent* bufev, void * carg) { printf ("---> write done \n" ); } void error_cb (struct bufferevent * bufev, short what, void * carg) { if (what & EVBUFFER_EOF) { close(bufev->ev_read.ev_fd); bufferevent_free(bufev); return ; } perror("client fd error" ); close(bufev->ev_read.ev_fd); bufferevent_free(bufev); } void connect_cb (int sfd, short e, void *arg) { while (true ) { sockaddr_in addr; socklen_t addr_len = sizeof (addr); int cfd = accept(sfd, (sockaddr*)&addr, &addr_len); if (cfd == -1 ) { if (errno == EAGAIN || errno == EWOULDBLOCK) break ; perror("accept from server fd error" ); break ; } int flags = fcntl(cfd, F_GETFL, 0 ); fcntl(cfd, F_SETFL, flags | O_NONBLOCK); char *buf = (char *)malloc (BUFFER_SIZE); bufferevent* bufev = bufferevent_new(cfd, read_cb, write_cb, error_cb, buf); bufferevent_enable(bufev, EV_READ); } } int main () { int sfd = socket(AF_INET, SOCK_STREAM|SOCK_NONBLOCK, 0 ); sockaddr_in addr; addr.sin_addr.s_addr = htonl(INADDR_ANY); addr.sin_family = AF_INET; addr.sin_port = htons(SERVER_PORT); int ret = bind(sfd, (sockaddr*)&addr, sizeof (addr)); if (ret != 0 ) handle_error("bind fail" ); ret = listen(sfd, 10 ); if (ret != 0 ) handle_error("listen fail" ); event_base *base = (event_base*)event_init(); event ev; event_set(&ev, sfd, EV_READ | EV_PERSIST, connect_cb, NULL ); event_add(&ev, NULL ); event_base_dispatch(base); printf ("exit loop\n" ); return 0 ; }
3. libevent 1.1b 源码分析 选择 1.1b 版本的原因是,它是 libevent 最早的一个版本,实现了主体功能,没有做后期复杂的优化和更多高级功能的支持,对阅读源码来说比较简单。
我在这里 https://github.com/zouchengzhuo/libevent/tree/release-1.1b-comment 提交了一个带阅读注释的分支。
3.1 整体视图
如图中所示,libevent 工作的基本流程就是:
提供一个封装事件的结构体 event,一个核心入口 event_base 用于存储事件和事件调度器
通过 event_add 将事件加入存储,同时注册到根据运行平台选择的多路复用管理器 eventop 上
eventop 注册事件到内核多路复用库的监听,并在 eventloop 中执行 dispatch 操作,在有事件触发时将 event 加入到带优先级的激活队列中
在 eventloop 中执行激活队列中 event 携带的回调函数
下面将逐个分析这些环节的细节,并分析 libevent 内部是如何处理各类事件的。
3.2 event 核心事件 event 结构体就是异步事件驱动中的 “事件”。所有的事件均由此结构体描述,并绑定到 libevent 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 struct event { TAILQ_ENTRY (event) ev_next; TAILQ_ENTRY (event) ev_active_next; TAILQ_ENTRY (event) ev_signal_next; RB_ENTRY (event) ev_timeout_node; struct event_base *ev_base; int ev_fd; short ev_events; short ev_ncalls; short *ev_pncalls; struct timeval ev_timeout; int ev_pri; void (*ev_callback)(int , short , void *arg); void *ev_arg; int ev_res; int ev_flags; };
其中,ev_events 可以设置的事件类型含:
1 2 3 4 5 #define EV_TIMEOUT 0x01 #define EV_READ 0x02 #define EV_WRITE 0x04 #define EV_SIGNAL 0x08 #define EV_PERSIST 0x10
ev_flags 可以描述的当前 event 所在的队列类型有:
1 2 3 4 5 6 #define EVLIST_TIMEOUT 0x01 #define EVLIST_INSERTED 0x02 #define EVLIST_SIGNAL 0x04 #define EVLIST_ACTIVE 0x08 #define EVLIST_INTERNAL 0x10 #define EVLIST_INIT 0x80
3.3 event_base 核心入口 event_base 结构体是 libevent 库的核心和主入口,它保存了多路复用库对象和所有被监听的事件信息。
在这里 https://github.com/zouchengzhuo/libevent/blob/release-1.1b-comment/event-internal.h#L40 可以看到 event_base 的结构。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 struct event_base { const struct eventop *evsel; void *evbase; int event_count; int event_count_active; int event_gotterm; struct event_list **activequeues; int nactivequeues; struct event_list eventqueue; struct timeval event_tv; RB_HEAD(event_tree, event) timetree; };
从结构中可以看到,event_base 结构体中保存了:
当前运行环境中选择的多路复用库的 eventop 对象
当前运行环境中选择的多路复用库的属性对象,如 epollop、kqop(kqueue op)等
当前被注册监听的事件计数
当前被激活的事件计数
eventloop停止开关
激活事件优先级队列数组
激活事件优先级队列数组长度
保存全部 event 的链表
当前时间
保存超时事件的红黑树
3.4 eventop 多路复用库封装 libevent 根据不同的运行平台选择不同的多路复用库,不过所有平台的库都会被封装为一个 eventop 的结构体实现。
eventop 的结构在这里 https://github.com/zouchengzhuo/libevent/blob/release-1.1b-comment/event.h#L123 可以看到。
而epoll.c
、kqueue.c
、poll.c
、select.c
中则实现了各种多路复用库的封装。
以 epoll 的封装为例 https://github.com/zouchengzhuo/libevent/blob/release-1.1b-comment/epoll.c#L65 。
1 2 3 4 5 6 7 8 struct eventop epollops = { "epoll" , epoll_init, epoll_add, epoll_del, epoll_recalc, epoll_dispatch };
下面挑几个比较重要的方法分析下它们都做了哪些事情。
epoll_init epoll_init 中,执行了 epoll fd 初始化的逻辑,并且创建了一个 epollop 变量返回,这个变量将被保存到 event_base 的 evbase 属性中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 struct evepoll { struct event *evread; struct event *evwrite; }; struct epollop { struct evepoll *fds; int nfds; struct epoll_event *events; int nevents; int epfd; sigset_t evsigmask; };
epoll_add epoll_add 中,实现了事件注册的逻辑。
1 2 3 4 5 6 7 8 9 * @brief 向 epoll 中添加事件注册 * * @param arg epollop 指针 * @param ev 需要被添加到 epoll 监听的 event 指针 * @return int */ int epoll_add (void *arg, struct event *ev)
这里内部实现中需要注意的是,libevent实现分配好了 32000 个 epollop 变量的空间(即 epollop.fds 属性),然后把 event 中的 fd 当做索引,判断该索引是否已经绑定过了时间,若绑定过了,则使用 epoll_ctl + EPOLL_CTL_MOD 更新 epoll 监听,否则使用 epoll_ctl + EPOLL_CTL_ADD 向 epoll 添加监听。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 evep = &epollop->fds[fd]; op = EPOLL_CTL_ADD; events = 0 ; if (evep->evread != NULL ) { events |= EPOLLIN; op = EPOLL_CTL_MOD; } if (evep->evwrite != NULL ) { events |= EPOLLOUT; op = EPOLL_CTL_MOD; } if (ev->ev_events & EV_READ) events |= EPOLLIN; if (ev->ev_events & EV_WRITE) events |= EPOLLOUT; epev.data.ptr = evep; epev.events = events; if (epoll_ctl(epollop->epfd, op, ev->ev_fd, &epev) == -1 ) return (-1 );
epoll_del 和 epoll_add 类似,做了相反的操作,即如果读和写都被删掉了,则同过 EPOLL_CTL_DEL 去掉 epoll 监听,若还有剩下的,则通过 EPOLL_CTL_MOD 更新 epoll 监听。1 2 int epoll_del (void *arg, struct event *ev)
epoll_dispatch 在 epoll_dispatch 中,调用一次 epoll_wait,若有触发了可读/可写事件的 event,则调用 event_active 函数,将它们放到 event_base 的激活队列中去。
同时,若 event 的 ev_events 属性中没有 EV_PERSIST 的 flag,则代表这个事件是一次性的,本次触发完成后,通过 epoll_del 函数将其从 epoll 的监听中删除。1 2 3 4 5 6 7 8 9 10 * @brief 执行一次 epoll_wait,并分发被激活的事件 * * @param base event_base 指针 * @param arg epolltop指针 * @param tv epoll_wait 超时时间 * @return int */ int epoll_dispatch (struct event_base *base, void *arg, struct timeval *tv)
3.5 event_add 事件注册 可以看到,event_add 中做了三件事:
将事件插入到 event_base 的事件队列中
如果传入了超时时间,则将 event 加入到超时事件红黑树
将事件加入到多路复用库的监听列表
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 * @brief 向 event_base 中绑定一个 event * * @param ev 被绑定的 event * @param tv 超时时间 * @return int */ int event_add (struct event *ev, struct timeval *tv) { struct event_base *base = ev->ev_base; const struct eventop *evsel = base->evsel; void *evbase = base->evbase; event_debug(( "event_add: event: %p, %s%s%scall %p" , ev, ev->ev_events & EV_READ ? "EV_READ " : " " , ev->ev_events & EV_WRITE ? "EV_WRITE " : " " , tv ? "EV_TIMEOUT " : " " , ev->ev_callback)); assert(!(ev->ev_flags & ~EVLIST_ALL)); if (tv != NULL ) { struct timeval now; if (ev->ev_flags & EVLIST_TIMEOUT) event_queue_remove(base, ev, EVLIST_TIMEOUT); * this timeout before the callback can be executed * removes it from the active list. */ if ((ev->ev_flags & EVLIST_ACTIVE) && (ev->ev_res & EV_TIMEOUT)) { * event in a loop */ if (ev->ev_ncalls && ev->ev_pncalls) { *ev->ev_pncalls = 0 ; } event_queue_remove(base, ev, EVLIST_ACTIVE); } gettimeofday(&now, NULL ); timeradd(&now, tv, &ev->ev_timeout); event_debug(( "event_add: timeout in %d seconds, call %p" , tv->tv_sec, ev->ev_callback)); event_queue_insert(base, ev, EVLIST_TIMEOUT); } if ((ev->ev_events & (EV_READ|EV_WRITE)) && !(ev->ev_flags & (EVLIST_INSERTED|EVLIST_ACTIVE))) { event_queue_insert(base, ev, EVLIST_INSERTED); return (evsel->add(evbase, ev)); } else if ((ev->ev_events & EV_SIGNAL) && !(ev->ev_flags & EVLIST_SIGNAL)) { event_queue_insert(base, ev, EVLIST_SIGNAL); return (evsel->add(evbase, ev)); } return (0 ); }
3.6 event_del 事件删除 删除的逻辑和添加相反:
将事件从 event_base 的事件队列中移除
将事件从多路复用库的监听列表中移除
1 2 int event_del (struct event *ev)
3.7 event_base_loop 事件循环 event_base_loop 就是 libevent 库的时间循环启动方法,在时间循环中不断调用多路复用库的 dispatch,阻塞等待可用的事件触发。
事件触发后,会由多路复用库将事件写入到激活队列中,此时在 event_base_loop 中按优先级遍历激活队列中的事件,并调用它们的回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 int event_base_loop (struct event_base *base, int flags) { const struct eventop *evsel = base->evsel; void *evbase = base->evbase; struct timeval tv; int res, done; if (evsel->recalc(base, evbase, 0 ) == -1 ) return (-1 ); done = 0 ; while (!done) { if (base->event_gotterm) { base->event_gotterm = 0 ; break ; } while (event_gotsig) { event_gotsig = 0 ; if (event_sigcb) { res = (*event_sigcb)(); if (res == -1 ) { errno = EINTR; return (-1 ); } } } gettimeofday(&tv, NULL ); if (timercmp(&tv, &base->event_tv, <)) { struct timeval off; event_debug(("%s: time is running backwards, corrected" , __func__)); timersub(&base->event_tv, &tv, &off); timeout_correct(base, &off); } base->event_tv = tv; if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) timeout_next(base, &tv); else timerclear(&tv); if (!event_haveevents(base)) { event_debug(("%s: no events registered." , __func__)); return (1 ); } res = evsel->dispatch(base, evbase, &tv); if (res == -1 ) return (-1 ); timeout_process(base); if (base->event_count_active) { event_process_active(base); if (!base->event_count_active && (flags & EVLOOP_ONCE)) done = 1 ; } else if (flags & EVLOOP_NONBLOCK) done = 1 ; if (evsel->recalc(base, evbase, 0 ) == -1 ) return (-1 ); } event_debug(("%s: asked to terminate loop." , __func__)); return (0 ); }
上面的代码中,值的注意的是:
为了避免时钟回拨的问题,每轮 loop 中都会判断一次时间是否倒退,如果是,则修正超时红黑树中的时间。
dispatch 的超时时间是每次循环时重新计算的,若有超时事件,则使用最近一次超时事件作为 dispatch 的超时时间,否则使用默认的 5s。
3.8 IO/timeout/signal的处理逻辑 IO event 处理 IO 事件的处理比较简单,就通过多路复用库来实现监听触发就可以了。比较有特点的是它对事件优先级的处理。
创建 event,并通过 event_set 设置 event 属性。
1 2 3 4 5 6 7 8 9 10 // event.c void event_set(struct event *ev, int fd, short events, void (*callback)(int, short, void *), void *arg) { //...... 省略其它属性设置逻辑 /* by default, we put new events into the middle priority */ // 设置默认优先级,为 event_base 的 活跃队列数量的 一半,即默认中等优先级 ev->ev_pri = current_base->nactivequeues/2; }
通过 event_add 添加到 event_base 监听,进入事件队列,添加到多路复用库监听,若设置了超时时间,加入到超时事件红黑树。细节参考前面的 event_add 的逻辑。
1 2 3 4 5 6 7 8 9 * @brief 向 event_base 中绑定一个 event * * @param ev 被绑定的 event * @param tv 超时时间 * @return int */ int event_add (struct event *ev, struct timeval *tv)
通过 event_base_dispatch 循环调用多路复用库的 dispatch,等待事件触发。
1 2 3 4 5 6 7 8 9 10 11 12 13 int event_base_loop (struct event_base *base, int flags) { while (!done) { res = evsel->dispatch(base, evbase, &tv); } }
多路复用库的 dispatch 中,若有触发事件,将其加入激活队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 * @brief 执行一次 epoll_wait,并分发被激活的事件 * * @param base event_base 指针 * @param arg epolltop指针 * @param tv epoll_wait 超时时间 * @return int */ int epoll_dispatch (struct event_base *base, void *arg, struct timeval *tv) { struct epollop *epollop = arg; struct epoll_event *events = epollop->events; struct evepoll *evep; int i, res, timeout; if (evsignal_deliver(&epollop->evsigmask) == -1 ) return (-1 ); timeout = tv->tv_sec * 1000 + (tv->tv_usec + 999 ) / 1000 ; res = epoll_wait(epollop->epfd, events, epollop->nevents, timeout); if (evsignal_recalc(&epollop->evsigmask) == -1 ) return (-1 ); if (res == -1 ) { if (errno != EINTR) { event_warn("epoll_wait" ); return (-1 ); } evsignal_process(); return (0 ); } else if (evsignal_caught) evsignal_process(); event_debug(("%s: epoll_wait reports %d" , __func__, res)); for (i = 0 ; i < res; i++) { int which = 0 ; int what = events[i].events; struct event *evread = NULL , *evwrite = NULL ; evep = (struct evepoll *)events[i].data.ptr; if (what & EPOLLHUP) what |= EPOLLIN | EPOLLOUT; else if (what & EPOLLERR) what |= EPOLLIN | EPOLLOUT; if (what & EPOLLIN) { evread = evep->evread; which |= EV_READ; } if (what & EPOLLOUT) { evwrite = evep->evwrite; which |= EV_WRITE; } if (!which) continue ; if (evread != NULL && !(evread->ev_events & EV_PERSIST)) event_del(evread); if (evwrite != NULL && evwrite != evread && !(evwrite->ev_events & EV_PERSIST)) event_del(evwrite); if (evread != NULL ) event_active(evread, EV_READ, 1 ); if (evwrite != NULL ) event_active(evwrite, EV_WRITE, 1 ); } return (0 ); }
事件加入激活队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 * @brief 将 event 激活并放入到激活队列中 * * @param ev event * @param res 触发事件,如 EV_TIMEOUT / EV_READ / EV_WRITE * @param ncalls 触发次数 */ void event_active (struct event *ev, int res, short ncalls) { if (ev->ev_flags & EVLIST_ACTIVE) { ev->ev_res |= res; return ; } ev->ev_res = res; ev->ev_ncalls = ncalls; ev->ev_pncalls = NULL ; event_queue_insert(ev->ev_base, ev, EVLIST_ACTIVE); }
在 event_base_dispatch 一次循环等待完成后,执行激活事件列表中的回调函数。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 int event_base_loop (struct event_base *base, int flags) { while (!done) { if (base->event_count_active) { event_process_active(base); if (!base->event_count_active && (flags & EVLOOP_ONCE)) done = 1 ; } else if (flags & EVLOOP_NONBLOCK) done = 1 ; } }
处理激活队列中的事件回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 * @brief 处理激活队列中的事件,每个事件都根据 ev_ncalls 调用对应次数的回调函数,并将其从激活队列中移除 * * @param base */ static void event_process_active (struct event_base *base) { struct event *ev; struct event_list *activeq = NULL ; int i; short ncalls; if (!base->event_count_active) return ; for (i = 0 ; i < base->nactivequeues; ++i) { if (TAILQ_FIRST(base->activequeues[i]) != NULL ) { activeq = base->activequeues[i]; break ; } } for (ev = TAILQ_FIRST(activeq); ev; ev = TAILQ_FIRST(activeq)) { event_queue_remove(base, ev, EVLIST_ACTIVE); ncalls = ev->ev_ncalls; ev->ev_pncalls = &ncalls; while (ncalls) { ncalls--; ev->ev_ncalls = ncalls; (*ev->ev_callback)((int )ev->ev_fd, ev->ev_res, ev->ev_arg); } } }
timeout 处理 libevent 通过不断的动态计算每次 dispatch 的超时时间,来实现对事件自身超时的处理。
event 通过 event_add 添加到 event_base 的监听中时,若传入了超时时间,则加入超时红黑树。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 * @brief 向 event_base 中绑定一个 event * * @param ev 被绑定的 event * @param tv 超时时间 * @return int */ int event_add (struct event *ev, struct timeval *tv) { if (tv != NULL ) { struct timeval now; if (ev->ev_flags & EVLIST_TIMEOUT) event_queue_remove(base, ev, EVLIST_TIMEOUT); * this timeout before the callback can be executed * removes it from the active list. */ if ((ev->ev_flags & EVLIST_ACTIVE) && (ev->ev_res & EV_TIMEOUT)) { * event in a loop */ if (ev->ev_ncalls && ev->ev_pncalls) { *ev->ev_pncalls = 0 ; } event_queue_remove(base, ev, EVLIST_ACTIVE); } gettimeofday(&now, NULL ); timeradd(&now, tv, &ev->ev_timeout); event_debug(( "event_add: timeout in %d seconds, call %p" , tv->tv_sec, ev->ev_callback)); event_queue_insert(base, ev, EVLIST_TIMEOUT); } }
通过 event_base_dispatch 循环调用多路复用库的 dispatch 时,查找最近的一个超时时间的超时时间点。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 int event_base_loop (struct event_base *base, int flags) { while (!done) { base->event_tv = tv; if (!base->event_count_active && !(flags & EVLOOP_NONBLOCK)) timeout_next(base, &tv); else timerclear(&tv); if (!event_haveevents(base)) { event_debug(("%s: no events registered." , __func__)); return (1 ); } res = evsel->dispatch(base, evbase, &tv); } }
查找最近的超时时间点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 * @brief 计算 epoll_wait 的超时时间,若没有超时 event,返回默认的 5s;若有超时的 event,则返回当前时间距离最近超时 event 的时间差(最小值为0)。 * * @param base event_base 对象 * @param tv epoll_wait 的超时时间 * @return int */ int timeout_next (struct event_base *base, struct timeval *tv) { struct timeval dflt = TIMEOUT_DEFAULT; struct timeval now; struct event *ev; if ((ev = RB_MIN(event_tree, &base->timetree)) == NULL ) { *tv = dflt; return (0 ); } if (gettimeofday(&now, NULL ) == -1 ) return (-1 ); if (timercmp(&ev->ev_timeout, &now, <=)) { timerclear(tv); return (0 ); } timersub(&ev->ev_timeout, &now, tv); assert(tv->tv_sec >= 0 ); assert(tv->tv_usec >= 0 ); event_debug(("timeout_next: in %d seconds" , tv->tv_sec)); return (0 ); }
多路复用库的 dispatch 返回时,判断是否有已经超时的时间,若有,将其加入到激活队列,并从事件队列、超时红黑树中删除。
1 2 3 4 5 6 7 8 9 10 11 12 13 int event_base_loop (struct event_base *base, int flags) { while (!done) { timeout_process(base); } }
判断超时,加入激活队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 * @brief 处理超时队列中的 event,判断是否超时,已经超时的从其它队列移除,给放到激活队列中 * * @param base */ void timeout_process (struct event_base *base) { struct timeval now; struct event *ev, *next; gettimeofday(&now, NULL ); for (ev = RB_MIN(event_tree, &base->timetree); ev; ev = next) { if (timercmp(&ev->ev_timeout, &now, >)) break ; next = RB_NEXT(event_tree, &base->timetree, ev); event_queue_remove(base, ev, EVLIST_TIMEOUT); event_del(ev); event_debug(("timeout_process: call %p" , ev->ev_callback)); event_active(ev, EV_TIMEOUT, 1 ); } }
event_base_dispatch 中处理激活队列中的事件,若发现超时的 event,则执行其回调,这里的逻辑就和 IO 事件处理中的一致了。
1 2 3 4 5 6 7 8 * @brief 处理激活队列中的事件,每个事件都根据 ev_ncalls 调用对应次数的回调函数,并将其从激活队列中移除 * * @param base */ static void event_process_active (struct event_base *base)
signal 处理 在比较新版本的 linux 内核中,进程信号可以通过 signalfd 来监听。
libevent 1.1b 在 2002 年就发布了,signalfd 从 linux kernel 2.6.22 版本开始支持 https://man7.org/linux/man-pages/man2/signalfd.2.html#VERSIONS ,而 2.6.22 版本是 2008 年才发布的。 https://zh.wikipedia.org/wiki/Linux%E5%86%85%E6%A0%B8%E7%89%88%E6%9C%AC%E5%8E%86%E5%8F%B2#%E7%89%88%E6%9C%AC_2.6.x.y 。
所以没办法用前面的多路复用机制来实现进程信号的监听,下面我们看看 libevent 是怎么巧妙的实现 signal 监听的。
整体流程如上图。首先,通过 event_init 初始化 event_base 时,会同时初始化多路复用库。
1 2 3 4 5 6 7 8 9 10 void *event_init (void ) { current_base->evbase = NULL ; for (i = 0 ; eventops[i] && !current_base->evbase; i++) { current_base->evsel = eventops[i]; current_base->evbase = current_base->evsel->init(); } }
多路复用库初始化时:
初始化一个 sigset_t 变量 evsigmask,用于记录该 epoll 上要监听的信号值
由于多路复用库只能处理 fd,不能处理进程信号,所以需要创建一个用于占位的内部事件 ev_signal 加入 epoll 监听,不加这个的话,就没办法利用多路复用库来阻塞等待信号的发生了
初始化一对全双工的 socket :ev_signal_pair,将其中一端 ev_signal_pair[1] 的可读事件设置给 ev_signal,绑定到 event_base 的监听中,可读事件触发后,调用一次 evsignal_cb。
下面以 epoll.c 中的实现为例:
1 2 3 4 5 6 7 void *epoll_init (void ) { evsignal_init(&epollop->evsigmask); }
evsignal_init 中,做了上面提到的三个工作:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 * @brief 初始化 ev_signal 这个 event,监听它的读事件,读到信号后,触发 evsignal_cb * * @param evsigmask */ void evsignal_init (sigset_t *evsigmask) { sigemptyset(evsigmask); * Our signal handler is going to write to one end of the socket * pair to wake up our event loop. The event loop then scans for * signals that got delivered. */ if (socketpair(AF_UNIX, SOCK_STREAM, 0 , ev_signal_pair) == -1 ) event_err(1 , "%s: socketpair" , __func__); FD_CLOSEONEXEC(ev_signal_pair[0 ]); FD_CLOSEONEXEC(ev_signal_pair[1 ]); event_set(&ev_signal, ev_signal_pair[1 ], EV_READ, evsignal_cb, &ev_signal); ev_signal.ev_flags |= EVLIST_INTERNAL; }
初始化完毕后,在使用时,创建 event并通过 event_set 设置 event 属性。其中,若要监听进程信号,则在监听的事件类型 ev_events 中加上 EV_SIGNAL,并将需要监听的信号值设置为 event 的 ev_fd。
event创建并设置好后,通过 event_add 加入 event_base 监听,事件加入信号事件队列,并添加多路复用库监听。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 * @brief 向 event_base 中绑定一个 event * * @param ev 被绑定的 event * @param tv 超时时间 * @return int */ int event_add (struct event *ev, struct timeval *tv) { if ((ev->ev_events & EV_SIGNAL) && !(ev->ev_flags & EVLIST_SIGNAL)) { event_queue_insert(base, ev, EVLIST_SIGNAL); return (evsel->add(evbase, ev)); } }
多路复用库添加监听时,判断如果是 signal 事件,则不是直接将 event.ev_fd 加入到 epoll 监听,此时 event.ev_fd 保存的实际上是要监听的 信号值,将其加入到 epoll 的变量 evsigmask 即可。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 * @brief 向 epoll 中添加事件注册 * * @param arg epollop 指针 * @param ev 需要被添加到 epoll 监听的 event 指针 * @return int */ int epoll_add (void *arg, struct event *ev) { if (ev->ev_events & EV_SIGNAL) return (evsignal_add(&epollop->evsigmask, ev)); }
在 evsignal_add 函数中,将 event 中携带的,要监听的信号值,设置给 epollop 的 evsigmask 属性。
多路复用库的 dispatch 操作中,在一次阻塞等待结束后,调用一次 evsignal_recalc。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 * @brief 执行一次 epoll_wait,并分发被激活的事件 * * @param base event_base 指针 * @param arg epolltop指针 * @param tv epoll_wait 超时时间 * @return int */ int epoll_dispatch (struct event_base *base, void *arg, struct timeval *tv) { if (evsignal_recalc(&epollop->evsigmask) == -1 ) return (-1 ); }
evsignal_recalc 中遍历 signal events 列表,通过 sigaction 将进程信号值加入到内核的信号监听中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 * @brief 遍历 信号event 队列, 用 sigaction 重新注册需要监听的信号量,以及收到信号后的响应事件 * * @param evsigmask * @return int */ int evsignal_recalc (sigset_t *evsigmask) { struct sigaction sa; struct event *ev; if (!ev_signal_added) { ev_signal_added = 1 ; event_add(&ev_signal, NULL ); } if (TAILQ_FIRST(&signalqueue) == NULL && !needrecalc) return (0 ); needrecalc = 0 ; if (sigprocmask(SIG_BLOCK, evsigmask, NULL ) == -1 ) return (-1 ); memset (&sa, 0 , sizeof (sa)); sa.sa_handler = evsignal_handler; sa.sa_mask = *evsigmask; sa.sa_flags |= SA_RESTART; TAILQ_FOREACH(ev, &signalqueue, ev_signal_next) { if (sigaction(EVENT_SIGNAL(ev), &sa, NULL ) == -1 ) return (-1 ); } return (0 ); }
当信号触发时,调用一次 evsignal_handler。evsignal_handler中,设置一下信号被触发的标记 evsignal_caught ,信号触发次数 evsigcaught[sig]+1,并向 ev_signal_pair[0] 中写入一个字符 a,此操作会触发 ev_signal 绑定的可读回调 evsignal_cb,将 ev_signal 重新注册进 event_base 中。这里要注意的是,触发 evsignal_cb 的操作本身并无意义,只是重新添加一次 ev_signal,它的真正意义是能立即结束多路复用库的等待阻塞,立即结束 wait 并进入后面的激活事件处理流程。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 * @brief 信号触发时,设置evsignal_caught为1, * 增加evsigcaught中该sig的触发计数,并向 ev_signal_pair[0] 写入一个字符,以触发 此文件中 ev_signal 的可读事件 * * @param sig 信号值,是 event 的 fd */ static void evsignal_handler (int sig) { evsigcaught[sig]++; evsignal_caught = 1 ; write(ev_signal_pair[0 ], "a" , 1 ); }
多路复用 dispatch 中,判断,若 evsignal_caught 标记已经被设置,则调用一次 evsignal_process。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 * @brief 执行一次 epoll_wait,并分发被激活的事件 * * @param base event_base 指针 * @param arg epolltop指针 * @param tv epoll_wait 超时时间 * @return int */ int epoll_dispatch (struct event_base *base, void *arg, struct timeval *tv) { if (res == -1 ) { if (errno != EINTR) { event_warn("epoll_wait" ); return (-1 ); } evsignal_process(); return (0 ); } else if (evsignal_caught) evsignal_process(); }
evsignal_process函数中,遍历信号事件队列,从 evsigcaught[sig] 中取出信号的触发次数,将信号 event 加入到 event_base 的激活队列中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 * @brief 处理信号 event,通过 evsigcaught[event->fd] 判断触发次数,将信号 event 放入 event_base 的激活队列 * */ void evsignal_process (void ) { struct event *ev; short ncalls; TAILQ_FOREACH(ev, &signalqueue, ev_signal_next) { ncalls = evsigcaught[EVENT_SIGNAL(ev)]; if (ncalls) { if (!(ev->ev_events & EV_PERSIST)) event_del(ev); event_active(ev, EV_SIGNAL, ncalls); } } memset (evsigcaught, 0 , sizeof (evsigcaught)); evsignal_caught = 0 ; }
后面的流程就和 IO 一致了,event_base 的 loop 中处理激活队列中的事件,把信号事件处理掉,调用注册的回调。
1 2 3 4 5 6 7 8 * @brief 处理激活队列中的事件,每个事件都根据 ev_ncalls 调用对应次数的回调函数,并将其从激活队列中移除 * * @param base */ static void event_process_active (struct event_base *base)
3.9 evbuffer 除了事件库本身,libevent 还提供了一个内存缓冲区管理的组件,有了它,我们就不需要自己分配、扩容、释放内存空间了。
这块的源码组织有点奇怪,用于管理内存的组件名为 evbuffer,主体逻辑源码位于 buffer.c 中;基于evbuffer封装,用于管理 fd 上的读写事件的组件叫 bufferevent,源码位于 evbuffer.c 中。
evbuffer evbuffer 结构体的定义:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 struct evbuffer { u_char *buffer; u_char *orig_buffer; size_t misalign; size_t totallen; size_t off; void (*cb)(struct evbuffer *, size_t , size_t , void *); void *cbarg; };
光看代码和注释还是不够直观,我找到一张图,感觉描述的比较清楚。
在此数据结构上,提供了一堆用于读、写、扩容、删除等操作的函数,就不一一分析了,这里有一些阅读注释 https://github.com/zouchengzhuo/libevent/blob/release-1.1b-comment/buffer.c 。只挑一些比较有意思的点放在这里说一下吧。
在写入数据时,若内存空间不够,则会调用 evbuffer_expand 对内存区块进行扩容,扩容的策略是不够就翻倍。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 * @brief 对 buf 的内存区域进行扩容,最小256字节,每次翻倍扩容。扩容前先对数据进行强制对齐。 * * @param buf 要扩容的 evbuffer * @param datlen 本次扩容后要插入的数据长度 * @return int */ int evbuffer_expand (struct evbuffer *buf, size_t datlen) { size_t need = buf->misalign + buf->off + datlen; if (buf->totallen >= need) return (0 ); * If the misalignment fulfills our data needs, we just force an * alignment to happen. Afterwards, we have enough space. */ if (buf->misalign >= datlen) { evbuffer_align(buf); } else { void *newbuf; size_t length = buf->totallen; if (length < 256 ) length = 256 ; while (length < need) length <<= 1 ; if (buf->orig_buffer != buf->buffer) evbuffer_align(buf); if ((newbuf = realloc (buf->buffer, length)) == NULL ) return (-1 ); buf->orig_buffer = buf->buffer = newbuf; buf->totallen = length; } return (0 ); }
清理数据时,只是对 buffer 指向的位置和 off、misalign 的值进行调整,并不真的去做内存操作,能够减小内存操作的开销。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 * @brief 把 evbuffer 的实际数据清理一段 * * @param buf 要被清空的 evbuffer * @param len 要清空的实际数据长度 */ void evbuffer_drain (struct evbuffer *buf, size_t len) { size_t oldoff = buf->off; if (len >= buf->off) { buf->off = 0 ; buf->buffer = buf->orig_buffer; buf->misalign = 0 ; goto done; } buf->buffer += len; buf->misalign += len; buf->off -= len; done: if (buf->off != oldoff && buf->cb != NULL ) (*buf->cb)(buf, oldoff, buf->off, buf->cbarg); }
数据写入或者删除操作完成后,都会尝试调用回调函数。这样从 fd 中读取数据到 evbuffer 中之后,业务逻辑可以在回调中获取数据操作前后的大小等信息,从而根据实际情况决定下一步动作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 int evbuffer_add (struct evbuffer *buf, void *data, size_t datlen) { if (datlen && buf->cb != NULL ) (*buf->cb)(buf, oldoff, buf->off, buf->cbarg); } void evbuffer_drain (struct evbuffer *buf, size_t len) { if (buf->off != oldoff && buf->cb != NULL ) (*buf->cb)(buf, oldoff, buf->off, buf->cbarg); }
bufferevent 基于 evbuffer,bufferevent 可以为 fd 封装一套数据读写的事件注册到 event_base 上,并提供了读写高低水位线的功能。下面是 bufferevent 的工作流程。
初始化一个 bufferevent 对象,设置好 读写缓冲区、读写事件、读写回调、出错回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 * @brief 初始化一个 bufferevent 对象,设置好 读写缓冲区、读写事件、读写回调、出错回调 * * @param fd 目标fd * @param readcb 读取回调 * @param writecb 写入回调 * @param errorcb 出错回调 * @param cbarg 回调时携带的参数 * @return struct bufferevent* */ struct bufferevent *bufferevent_new (int fd, evbuffercb readcb, evbuffercb writecb, everrorcb errorcb, void *cbarg) { struct bufferevent *bufev; if ((bufev = calloc (1 , sizeof (struct bufferevent))) == NULL ) return (NULL ); if ((bufev->input = evbuffer_new()) == NULL ) { free (bufev); return (NULL ); } if ((bufev->output = evbuffer_new()) == NULL ) { evbuffer_free(bufev->input); free (bufev); return (NULL ); } event_set(&bufev->ev_read, fd, EV_READ, bufferevent_readcb, bufev); event_set(&bufev->ev_write, fd, EV_WRITE, bufferevent_writecb, bufev); bufev->readcb = readcb; bufev->writecb = writecb; bufev->errorcb = errorcb; bufev->cbarg = cbarg; bufev->enabled = EV_READ | EV_WRITE; return (bufev); }
启动 bufferevent 的读写事件,将上面生成的读写 event 注册到 event_base 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 * @brief 启用 bufferevent 的 可读/可写事件 * * @param bufev * @param event * @return int */ int bufferevent_enable (struct bufferevent *bufev, short event) { if (event & EV_READ) { if (bufferevent_add(&bufev->ev_read, bufev->timeout_read) == -1 ) return (-1 ); } if (event & EV_WRITE) { if (bufferevent_add(&bufev->ev_write, bufev->timeout_write) == -1 ) return (-1 ); } bufev->enabled |= event; return (0 ); } * @brief 将一个 event 添加到 event_base 中 * * @param ev event * @param timeout 超时时间,单位秒 * @return int */ static int bufferevent_add (struct event *ev, int timeout) { struct timeval tv, *ptv = NULL ; if (timeout) { timerclear(&tv); tv.tv_sec = timeout; ptv = &tv; } return (event_add(ev, ptv)); }
从 fd 读取数据时
当 evbuffer 缓冲区中的数据低于水位线,啥也不做
当 evbuffer 缓冲区中的数据高于低水位线,低于高水位线,调用用户设置的读取回调
当 evbuffer 缓冲区中的数据高于高水位线,将 fd 的可读事件从 event_base 上移除,并为 evbuffer 绑定一个回调,等待用户消费数据。回调中当前数据大小下降到高水位线以下时,移除此回调并重新将 fd 的可读事件注册回 event_base 中
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 * @brief eventbuffer 绑定的 fd 的可读回调,尝试从 fd 中读取数据放到 input 缓冲区中,并控制水位线 * * @param fd 目标 fd * @param event 可读 event * @param arg eventbuffer 对象指针 */ static void bufferevent_readcb (int fd, short event, void *arg) { len = EVBUFFER_LENGTH(bufev->input); if (bufev->wm_read.low != 0 && len < bufev->wm_read.low) return ; if (bufev->wm_read.high != 0 && len > bufev->wm_read.high) { struct evbuffer *buf = bufev->input; event_del(&bufev->ev_read); evbuffer_setcb(buf, bufferevent_read_pressure_cb, bufev); return ; } (*bufev->readcb)(bufev, bufev->cbarg); }
上面的代码中,当前数据大于高水位线时,为 evbuffer 绑定了 bufferevent_read_pressure_cb 回调函数,此函数中检测 evbuffer 做了数据操作之后,最新的数据大小,若小于高水位线了,则删掉 evbuffer 的回调,并将 fd 的可读事件重新注册回 event_base。
通过这种方式,用户就可以通过指定高低水位线的方式控制数据读取的量了,不需要自己管理 fd 的读写事件,非常方便。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 * @brief 当 bufferevent 的输入缓冲区 input 这个 evbuffer 中的数据被消费,导致实际数据长度在高水位线之下之后 * 将 bufferevent 的读取事件重新注册会 event_base 的监听,以继续从fd中读取数据 * * @param buf * @param old * @param now * @param arg */ void bufferevent_read_pressure_cb (struct evbuffer *buf, size_t old, size_t now, void *arg) { struct bufferevent *bufev = arg; * If we are below the watermak then reschedule reading if it's * still enabled. */ if (bufev->wm_read.high == 0 || now < bufev->wm_read.high) { evbuffer_setcb(buf, NULL , NULL ); if (bufev->enabled & EV_READ) bufferevent_add(&bufev->ev_read, bufev->timeout_read); } }
写数据时,只有低水位线起作用,将可写事件绑定到 event_base 上,可写触发时就尝试写入数据,如果还有没写完的,则不断循环绑定可写事件,直到数据写完。当数据写到小于写低水位线时,触发一次回调。
1 2 3 4 5 6 7 8 9 10 11 12 13 bufferevent_writecb(int fd, short event, void *arg) { if (EVBUFFER_LENGTH(bufev->output) != 0 ) bufferevent_add(&bufev->ev_write, bufev->timeout_write); if (EVBUFFER_LENGTH(bufev->output) <= bufev->wm_write.low) (*bufev->writecb)(bufev, bufev->cbarg); }
参考文档
本文链接:https://www.zoucz.com/blog/2022/06/20/f7dd6ef0-f0af-11ec-9fa0-5dbc93f9d3ee/