glib的事件循环
前言
QEMU和libvirt等虚拟化组件的事件循环架构都是基于glib的事件循环机制实现的,这里一同分析一下
glib
整个glib的事件循环架构由三个概念构成,即GMainLoop、GMainContext和GSource
GSource
glib用GSource表示每一个需要处理的事件源,其源代码如下所示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23struct _GSourceFuncs
{
GSourceFuncsPrepareFunc prepare; /* Can be NULL */
GSourceFuncsCheckFunc check; /* Can be NULL */
GSourceFuncsDispatchFunc dispatch;
GSourceFuncsFinalizeFunc finalize; /* Can be NULL */
/*< private >*/
/* For use by g_source_set_closure */
GSourceFunc closure_callback;
GSourceDummyMarshal closure_marshal; /* Really is of type GClosureMarshal */
};
struct _GSource
{
...
const GSourceFuncs *source_funcs;
...
GMainContext *context;
...
GSList *poll_fds;
...
};
其中的重点是poll_fds字段和source_funcs字段。
linux中的事件指的是等待某个资源,而poll_fds则保存了事件源所等待的资源,例如文件描述符等。当glib使用poll系统调用判断关联的资源可用时,即表明有事件到达。
而source_funcs描述了在事件循环中如何操作定义的事件源。其中prepare在poll之前调用,用来检查是否已经有事件到达或准备后续poll所需要的资源;check在poll之后调用,用来确认是否有事件到达;dispatch在事件到达后用来处理事件;finalize在事件源注销时用来清理相关的资源。其各个操作的状态图如下所示
在初始状态中,首先调用自定义的prepare(),完成poll前的资源准备,状态转换为prepared;然后poll,状态转换为polling;在poll结束后,调用自定义的check(),确认事件源中所有可用的关联资源,状态转换为dispatching;最后,对于可用的资源调用dispatch(),完成到达事件处理
这里我们自定义一个事件源,让其等待标准输入描述符资源,相关代码如下所示
1 | typedef struct GSourceInput { |
可以看到,定义事件源就是定义上述的source_funcs。具体的,g_source_input_prepare()设置后续poll的超时时间为1s;而poll结束后,g_source_input_check()通过检查相关标志位判断标准输入描述符是否有输入;如果有,则继续调用g_source_input_dispatch()完成事件处理
最终效果如下图所示
GMainContext
考虑到用户可能会在一个线程中同时处理多个事件源(例如I/O线程),因此glib提供了GMainContext来简单的处理多个事件源,其结构如下所示1
2
3
4
5
6struct _GMainContext
{
...
GQueue source_lists;
...
};
其关键字段是source_lists,其将所有关联的事件源存储在该链表中,方便后续进行遍历
glib使用g_main_context_iteration()来便捷的对GMainContext下所有的GSource进行一轮事件循环,如下所示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48gboolean
g_main_context_iteration (GMainContext *context, gboolean may_block)
{
...
retval = g_main_context_iterate_unlocked (context, may_block, TRUE, G_THREAD_SELF);
...
}
/* HOLDS context lock */
static gboolean
g_main_context_iterate_unlocked (GMainContext *context,
gboolean block,
gboolean dispatch,
GThread *self)
{
...
if (!context->cached_poll_array)
{
context->cached_poll_array_size = context->n_poll_records;
context->cached_poll_array = g_new (GPollFD, context->n_poll_records);
}
allocated_nfds = context->cached_poll_array_size;
fds = context->cached_poll_array;
g_main_context_prepare_unlocked (context, &max_priority);
while ((nfds = g_main_context_query_unlocked (
context, max_priority, &timeout_usec, fds,
allocated_nfds)) > allocated_nfds)
{
g_free (fds);
context->cached_poll_array_size = allocated_nfds = nfds;
context->cached_poll_array = fds = g_new (GPollFD, nfds);
}
if (!block)
timeout_usec = 0;
g_main_context_poll_unlocked (context, timeout_usec, max_priority, fds, nfds);
some_ready = g_main_context_check_unlocked (context, max_priority, fds, nfds);
if (dispatch)
g_main_context_dispatch_unlocked (context);
...
return some_ready;
}
可以看到,其一次事件循环和前面GSource章节介绍的单个事件源循环是一致的,即包括prepare、poll、check和dispatch等步骤,只是GMainContext是对其下的多个GSource进行操作,以g_main_context_prepare_unlocked()为例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76static gboolean
g_main_context_prepare_unlocked (GMainContext *context,
gint *priority)
{
...
g_source_iter_init (&iter, context, TRUE);
while (g_source_iter_next (&iter, &source))
{
gint64 source_timeout_usec = -1;
if (!(source->flags & G_SOURCE_READY))
{
gboolean result;
gboolean (* prepare) (GSource *source,
gint *timeout);
prepare = source->source_funcs->prepare;
if (prepare)
{
gint64 begin_time_nsec G_GNUC_UNUSED;
int source_timeout_msec = -1;
context->in_check_or_prepare++;
UNLOCK_CONTEXT (context);
begin_time_nsec = G_TRACE_CURRENT_TIME;
result = (*prepare) (source, &source_timeout_msec);
TRACE (GLIB_MAIN_AFTER_PREPARE (source, prepare, source_timeout_msec));
source_timeout_usec = extend_timeout_to_usec (source_timeout_msec);
g_trace_mark (begin_time_nsec, G_TRACE_CURRENT_TIME - begin_time_nsec,
"GLib", "GSource.prepare",
"%s ⇒ %s",
(g_source_get_name (source) != NULL) ? g_source_get_name (source) : "(unnamed)",
result ? "ready" : "unready");
LOCK_CONTEXT (context);
context->in_check_or_prepare--;
}
else
result = FALSE;
...
if (result)
{
GSource *ready_source = source;
while (ready_source)
{
ready_source->flags |= G_SOURCE_READY;
ready_source = ready_source->priv->parent_source;
}
}
}
if (source->flags & G_SOURCE_READY)
{
n_ready++;
current_priority = source->priority;
context->timeout_usec = 0;
}
if (source_timeout_usec >= 0)
{
if (context->timeout_usec < 0)
context->timeout_usec = source_timeout_usec;
else
context->timeout_usec = MIN (context->timeout_usec, source_timeout_usec);
}
}
g_source_iter_clear (&iter);
...
return (n_ready > 0);
}
可以看到,其确实会调用每一个GSource的prepare函数指针并根据返回值进行相关操作
GMainLoop
前面GMainContext仅仅提供了一次事件循环的接口,而glib使用GMainLoop进行多次的时间循环,其结构如下所示1
2
3
4
5
6struct _GMainLoop
{
GMainContext *context;
gboolean is_running; /* (atomic) */
gint ref_count; /* (atomic) */
};
其使用g_main_loop_run()作为多次循环的接口,如下所示1
2
3
4
5
6
7
8
9
10
11
12
13void
g_main_loop_run (GMainLoop *loop)
{
...
g_atomic_int_set (&loop->is_running, TRUE);
while (g_atomic_int_get (&loop->is_running))
g_main_context_iterate_unlocked (loop->context, TRUE, TRUE, self);
g_main_context_release_unlocked (loop->context);
UNLOCK_CONTEXT (loop->context);
...
}
将前面自定义的GSource结合其余部分进行整理,即可得到glib的事件循环demo,执行tar -gxvf glib_event_loop.tar.gz && make -C glib_event_loop
即可完成编译运行
qemu
Qemu使用事件循环机制可以提高设备模拟的效率。具体的,Qemu的线程模型如下所示
Qemu中有若干个线程,其中main loop线程会不断监听各种事件,iothread会单独用来处理设备I/O操作,每一个guest cpu都会有一个vcpu线程用来执行guest代码和设备模拟,还有一些诸如热迁移migration线程和远程连接VNC线程等辅助线程
当guest访问设备时,vcpu线程会捕获该访问并在vcpu线程中调用设备的相关回调函数。在设备的回调函数返回之前,vcpu线程无法恢复guest的代码执行,即设备的模拟会阻塞vcpu线程的执行
得益于事件循环机制,当guest访问设备时,vcpu线程会将设备模拟的耗时操作通过事件循环机制通知主循环线程或iothread线程,然后立即返回guest的代码执行。这样避免了设备模拟对于vcpu线程的阻塞,提高了guest的性能
自定义GSource
Qemu基于glib的事件循环机制,自定义了Qemu的事件源struct AioContext,如下所示
1 | struct AioContext { |
可以看到,其符合前面glib自定义事件源的数据格式。其中AioContext事件源主要关心三类资源:
- struct AioHandler该资源即文件描述符类资源,提供了文件描述符的读/写回调函数用来处理事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20struct AioHandler {
GPollFD pfd;
IOHandler *io_read;
IOHandler *io_write;
AioPollFn *io_poll;
IOHandler *io_poll_ready;
IOHandler *io_poll_begin;
IOHandler *io_poll_end;
void *opaque;
QLIST_ENTRY(AioHandler) node;
QLIST_ENTRY(AioHandler) node_ready; /* only used during aio_poll() */
QLIST_ENTRY(AioHandler) node_deleted;
QLIST_ENTRY(AioHandler) node_poll;
QSLIST_ENTRY(AioHandler) node_submitted;
unsigned flags; /* see fdmon-io_uring.c */
int64_t poll_idle_timeout; /* when to stop userspace polling */
bool poll_ready; /* has polling detected an event? */
}; - struct QEMUBH这是Qemu模拟的内核的中断处理机制,即中断处理的bottom-half部分,用来实现异步调用功能。
1
2
3
4
5
6
7
8
9struct QEMUBH {
AioContext *ctx;
const char *name;
QEMUBHFunc *cb;
void *opaque;
QSLIST_ENTRY(QEMUBH) next;
unsigned flags;
MemReentrancyGuard *reentrancy_guard;
};
概括来说,Qemu可以注册一个QEMUBH资源,并异步地设置AioHandler的notifier字段,用来通知该资源可用,从而调用QEMUBH的cb回调逻辑 - struct QEMUTimer即定时器资源,当超时时调用定时器的cb回调函数来处理事件
1
2
3
4
5
6
7
8
9struct QEMUTimer {
int64_t expire_time; /* in nanoseconds */
QEMUTimerList *timer_list;
QEMUTimerCB *cb;
void *opaque;
QEMUTimer *next;
int attributes;
int scale;
};
aio_source_funcs
参考前面glib的GSource小节,Qemu中AioContext自定义事件源的操作接口是aio_source_funcs()1
2
3
4
5
6static GSourceFuncs aio_source_funcs = {
aio_ctx_prepare,
aio_ctx_check,
aio_ctx_dispatch,
aio_ctx_finalize
};
这里我们分析一下aio_ctx_prepare()、aio_ctx_check和aio_ctx_dispatch(),来更好的理解qemu的事件循环流程
aio_ctx_prepare
1 | static gboolean |
可以看到,如果没有即时事件,则设置poll为QEMUBH和定时器等的最小超时时间即可
aio_ctx_check
1 | static gboolean |
可以看到,其检查了前面AioContext中关注的资源使用可用,即aio_handlers对应的文件描述符资源、bh_list对应的QEMUBH资源和tlg对应的定时器资源
aio_ctx_dispatch
1 | static gboolean |
可以看到,其会依次调用可用文件描述符资源、QEMUBH资源和定时器资源的回调函数
事件循环
由于Qemu自定义的事件源AioContext比较复杂,因此Qemu并没有直接使用glib的g_main_loop_run()接口进行事件循环,而是
使用自定义的qemu_main_loop(),如下所示
1 | //#0 qemu_main_loop () at ../system/runstate.c:779 |
可以看到,类似于前面GMainContext的一轮事件循环,Qemu在os_host_main_loop_wait()中完成一轮事件循环。
具体的,Qemu在glib_pollfds_fill()中获取poll的超时时间和文件描述符,然后在qemu_poll_ns()进行poll,并在glib_pollfds_poll中对可用资源进行处理
前面介绍了AioContext事件源关心三类资源,这里具体分析一下这三类资源是如何完成事件循环的
AioHandler
Qemu使用aio_set_fd_handler()向AioContext中添加资源,如下所示
1 | void aio_set_fd_handler(AioContext *ctx, |
可以看到,Qemu会创建对应的AioHandler并插入到ctx->aio_handlers链表中,并将文件描述符使用g_source_add_poll()绑定到AioContext自定义事件源
根据前面的分析,在事件循环中,glib_pollfds_fill()会通过g_main_context_query()将该绑定的文件描述符填充到poll数组,并在qemu_poll_ns中进行poll操作,最后在glib_pollfds_poll()中检查ctx->aio_handlers并完成回调函数的相关处理
这里特别分析一下最后调用的aio_notify(),如下所示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38void aio_notify(AioContext *ctx)
{
/*
* Write e.g. ctx->bh_list before writing ctx->notified. Pairs with
* smp_mb() in aio_notify_accept().
*/
smp_wmb();
qatomic_set(&ctx->notified, true);
/*
* Write ctx->notified (and also ctx->bh_list) before reading ctx->notify_me.
* Pairs with smp_mb() in aio_ctx_prepare or aio_poll.
*/
smp_mb();
if (qatomic_read(&ctx->notify_me)) {
event_notifier_set(&ctx->notifier);
}
}
int event_notifier_set(EventNotifier *e)
{
static const uint64_t value = 1;
ssize_t ret;
if (!e->initialized) {
return -1;
}
do {
ret = write(e->wfd, &value, sizeof(value));
} while (ret < 0 && errno == EINTR);
/* EAGAIN is fine, a read must be pending. */
if (ret < 0 && errno != EAGAIN) {
return -errno;
}
return 0;
}
其主要逻辑就是向AioContext的notifier字段写入数据。而notifier是eventfd系统调用的包装,AioContext用其来立即退出poll并重新进入新的事件循环,避免在glib_pollfds_fill()之后添加的资源一直等不到处理,如下所示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43AioContext *aio_context_new(Error **errp)
{
int ret;
AioContext *ctx;
ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
...
ret = event_notifier_init(&ctx->notifier, false);
if (ret < 0) {
error_setg_errno(errp, -ret, "Failed to initialize event notifier");
goto fail;
}
...
aio_set_event_notifier(ctx, &ctx->notifier,
aio_context_notifier_cb,
aio_context_notifier_poll,
aio_context_notifier_poll_ready);
...
}
int event_notifier_init(EventNotifier *e, int active)
{
int fds[2];
int ret;
ret = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
...
e->rfd = e->wfd = ret;
e->initialized = true;
...
return 0;
}
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *notifier,
EventNotifierHandler *io_read,
AioPollFn *io_poll,
EventNotifierHandler *io_poll_ready)
{
aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
(IOHandler *)io_read, NULL, io_poll,
(IOHandler *)io_poll_ready, notifier);
}
具体的,在创建AioContext时,其会通过eventfd系统调用创建notifier字段数据结构,并将notifier的文件描述符通过刚刚分析的aio_set_fd_handler()添加到该AioContext的关心资源。之后,每当有线程调用aio_notify(),添加的文件描述符就有可读资源,从而立即从poll中退出。
QEMUBH
Qemu使用aio_bh_enqueue()即向AioContext中添加资源,又用来产生事件,如下所示
1 | /* Called concurrently from any thread */ |
可以看到,其将QEMUBH插入到ctx->bh_list链表中然后调用aio_notify()。基于前面对aio_notify的介绍,其会立即退出poll,然后根据前面aio_ctx_check和前面aio_ctx_dispatch查看QEMUBH的flags然后进行相应的操作。
即如果aio_bh_enqueue()的new_flags字段包含BH_SCHEDULED,会使QEMUBH资源可用并在时间循环中迅速被处理
QEMUTimer
Qemu使用timer_mod_ns_locked()来向AioContext中添加资源,如下所示
1 | static bool timer_mod_ns_locked(QEMUTimerList *timer_list, |
可以看到,其设置QEMUTimer的超时时间后,添加到ctx->tlg中的相关链表中,会在expire_time后产生事件。
而根据前面aio_ctx_prepare可知,其设置的poll时间不大于所有的QEMUTimer超时时间,从而确保在glib_pollfds_poll()中能按时的处理事件