glib的事件循环
前言
QEMU和libvirt等虚拟化组件的事件循环架构都是基于glib的事件循环机制实现的,这里一同分析一下
glib
整个glib的事件循环架构由三个概念构成,即GMainLoop、GMainContext和GSource
GSource
glib用GSource表示每一个需要处理的事件源,其源代码如下所示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23struct _GSourceFuncs
{
GSourceFuncsPrepareFunc prepare; /* Can be NULL */
GSourceFuncsCheckFunc check; /* Can be NULL */
GSourceFuncsDispatchFunc dispatch;
GSourceFuncsFinalizeFunc finalize; /* Can be NULL */
/*< private >*/
/* For use by g_source_set_closure */
GSourceFunc closure_callback;
GSourceDummyMarshal closure_marshal; /* Really is of type GClosureMarshal */
};
struct _GSource
{
...
const GSourceFuncs *source_funcs;
...
GMainContext *context;
...
GSList *poll_fds;
...
};
其中的重点是poll_fds字段和source_funcs字段。
linux中的事件指的是等待某个资源,而poll_fds则保存了事件源所等待的资源,例如文件描述符等。当glib使用poll系统调用判断关联的资源可用时,即表明有事件到达。
而source_funcs描述了在事件循环中如何操作定义的事件源。其中prepare在poll之前调用,用来检查是否已经有事件到达或准备后续poll所需要的资源;check在poll之后调用,用来确认是否有事件到达;dispatch在事件到达后用来处理事件;finalize在事件源注销时用来清理相关的资源。其各个操作的状态图如下所示
在初始状态中,首先调用自定义的prepare(),完成poll前的资源准备,状态转换为prepared;然后poll,状态转换为polling;在poll结束后,调用自定义的check(),确认事件源中所有可用的关联资源,状态转换为dispatching;最后,对于可用的资源调用dispatch(),完成到达事件处理
这里我们自定义一个事件源,让其等待标准输入描述符资源,相关代码如下所示
1 | typedef struct GSourceInput { |
可以看到,定义事件源就是定义上述的source_funcs。具体的,g_source_input_prepare()设置后续poll的超时时间为1s;而poll结束后,g_source_input_check()通过检查相关标志位判断标准输入描述符是否有输入;如果有,则继续调用g_source_input_dispatch()完成事件处理
最终效果如下图所示
GMainContext
考虑到用户可能会在一个线程中同时处理多个事件源(例如I/O线程),因此glib提供了GMainContext来简单的处理多个事件源,其结构如下所示1
2
3
4
5
6struct _GMainContext
{
...
GQueue source_lists;
...
};
其关键字段是source_lists,其将所有关联的事件源存储在该链表中,方便后续进行遍历
glib使用g_main_context_iteration()来便捷的对GMainContext下所有的GSource进行一轮事件循环,如下所示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48gboolean
g_main_context_iteration (GMainContext *context, gboolean may_block)
{
...
retval = g_main_context_iterate_unlocked (context, may_block, TRUE, G_THREAD_SELF);
...
}
/* HOLDS context lock */
static gboolean
g_main_context_iterate_unlocked (GMainContext *context,
gboolean block,
gboolean dispatch,
GThread *self)
{
...
if (!context->cached_poll_array)
{
context->cached_poll_array_size = context->n_poll_records;
context->cached_poll_array = g_new (GPollFD, context->n_poll_records);
}
allocated_nfds = context->cached_poll_array_size;
fds = context->cached_poll_array;
g_main_context_prepare_unlocked (context, &max_priority);
while ((nfds = g_main_context_query_unlocked (
context, max_priority, &timeout_usec, fds,
allocated_nfds)) > allocated_nfds)
{
g_free (fds);
context->cached_poll_array_size = allocated_nfds = nfds;
context->cached_poll_array = fds = g_new (GPollFD, nfds);
}
if (!block)
timeout_usec = 0;
g_main_context_poll_unlocked (context, timeout_usec, max_priority, fds, nfds);
some_ready = g_main_context_check_unlocked (context, max_priority, fds, nfds);
if (dispatch)
g_main_context_dispatch_unlocked (context);
...
return some_ready;
}
可以看到,其一次事件循环和前面GSource章节介绍的单个事件源循环是一致的,即包括prepare、poll、check和dispatch等步骤,只是GMainContext是对其下的多个GSource进行操作,以g_main_context_prepare_unlocked()为例1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76static gboolean
g_main_context_prepare_unlocked (GMainContext *context,
gint *priority)
{
...
g_source_iter_init (&iter, context, TRUE);
while (g_source_iter_next (&iter, &source))
{
gint64 source_timeout_usec = -1;
if (!(source->flags & G_SOURCE_READY))
{
gboolean result;
gboolean (* prepare) (GSource *source,
gint *timeout);
prepare = source->source_funcs->prepare;
if (prepare)
{
gint64 begin_time_nsec G_GNUC_UNUSED;
int source_timeout_msec = -1;
context->in_check_or_prepare++;
UNLOCK_CONTEXT (context);
begin_time_nsec = G_TRACE_CURRENT_TIME;
result = (*prepare) (source, &source_timeout_msec);
TRACE (GLIB_MAIN_AFTER_PREPARE (source, prepare, source_timeout_msec));
source_timeout_usec = extend_timeout_to_usec (source_timeout_msec);
g_trace_mark (begin_time_nsec, G_TRACE_CURRENT_TIME - begin_time_nsec,
"GLib", "GSource.prepare",
"%s ⇒ %s",
(g_source_get_name (source) != NULL) ? g_source_get_name (source) : "(unnamed)",
result ? "ready" : "unready");
LOCK_CONTEXT (context);
context->in_check_or_prepare--;
}
else
result = FALSE;
...
if (result)
{
GSource *ready_source = source;
while (ready_source)
{
ready_source->flags |= G_SOURCE_READY;
ready_source = ready_source->priv->parent_source;
}
}
}
if (source->flags & G_SOURCE_READY)
{
n_ready++;
current_priority = source->priority;
context->timeout_usec = 0;
}
if (source_timeout_usec >= 0)
{
if (context->timeout_usec < 0)
context->timeout_usec = source_timeout_usec;
else
context->timeout_usec = MIN (context->timeout_usec, source_timeout_usec);
}
}
g_source_iter_clear (&iter);
...
return (n_ready > 0);
}
可以看到,其确实会调用每一个GSource的prepare函数指针并根据返回值进行相关操作
GMainLoop
前面GMainContext仅仅提供了一次事件循环的接口,而glib使用GMainLoop进行多次的时间循环,其结构如下所示1
2
3
4
5
6struct _GMainLoop
{
GMainContext *context;
gboolean is_running; /* (atomic) */
gint ref_count; /* (atomic) */
};
其使用g_main_loop_run()作为多次循环的接口,如下所示1
2
3
4
5
6
7
8
9
10
11
12
13void
g_main_loop_run (GMainLoop *loop)
{
...
g_atomic_int_set (&loop->is_running, TRUE);
while (g_atomic_int_get (&loop->is_running))
g_main_context_iterate_unlocked (loop->context, TRUE, TRUE, self);
g_main_context_release_unlocked (loop->context);
UNLOCK_CONTEXT (loop->context);
...
}
将前面自定义的GSource结合其余部分进行整理,即可得到glib的事件循环demo,执行tar -gxvf glib_event_loop.tar.gz && make -C glib_event_loop
即可完成编译运行
qemu
Qemu使用事件循环机制可以提高设备模拟的效率。具体的,Qemu的线程模型如下所示
Qemu中有若干个线程,其中main loop线程会不断监听各种事件,iothread会单独用来处理设备I/O操作,每一个guest cpu都会有一个vcpu线程用来执行guest代码和设备模拟,还有一些诸如热迁移migration线程和远程连接VNC线程等辅助线程
当guest访问设备时,vcpu线程会捕获该访问并在vcpu线程中调用设备的相关回调函数。在设备的回调函数返回之前,vcpu线程无法恢复guest的代码执行,即设备的模拟会阻塞vcpu线程的执行
得益于事件循环机制,当guest访问设备时,vcpu线程会将设备模拟的耗时操作通过事件循环机制通知主循环线程或iothread线程,然后立即返回guest的代码执行。这样避免了设备模拟对于vcpu线程的阻塞,提高了guest的性能
自定义GSource
Qemu基于glib的事件循环机制,自定义了Qemu的事件源struct AioContext,如下所示
1 | struct AioContext { |
可以看到,其符合前面glib自定义事件源的数据格式。其中AioContext事件源主要关心三类资源:
- struct AioHandler该资源即文件描述符类资源,提供了文件描述符的读/写回调函数用来处理事件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20struct AioHandler {
GPollFD pfd;
IOHandler *io_read;
IOHandler *io_write;
AioPollFn *io_poll;
IOHandler *io_poll_ready;
IOHandler *io_poll_begin;
IOHandler *io_poll_end;
void *opaque;
QLIST_ENTRY(AioHandler) node;
QLIST_ENTRY(AioHandler) node_ready; /* only used during aio_poll() */
QLIST_ENTRY(AioHandler) node_deleted;
QLIST_ENTRY(AioHandler) node_poll;
QSLIST_ENTRY(AioHandler) node_submitted;
unsigned flags; /* see fdmon-io_uring.c */
int64_t poll_idle_timeout; /* when to stop userspace polling */
bool poll_ready; /* has polling detected an event? */
}; - struct QEMUBH这是Qemu模拟的内核的中断处理机制,即中断处理的bottom-half部分,用来实现异步调用功能。
1
2
3
4
5
6
7
8
9struct QEMUBH {
AioContext *ctx;
const char *name;
QEMUBHFunc *cb;
void *opaque;
QSLIST_ENTRY(QEMUBH) next;
unsigned flags;
MemReentrancyGuard *reentrancy_guard;
};
概括来说,Qemu可以注册一个QEMUBH资源,并异步地设置AioHandler的notifier字段,用来通知该资源可用,从而调用QEMUBH的cb回调逻辑 - struct QEMUTimer即定时器资源,当超时时调用定时器的cb回调函数来处理事件
1
2
3
4
5
6
7
8
9struct QEMUTimer {
int64_t expire_time; /* in nanoseconds */
QEMUTimerList *timer_list;
QEMUTimerCB *cb;
void *opaque;
QEMUTimer *next;
int attributes;
int scale;
};
aio_source_funcs
参考前面glib的GSource小节,Qemu中AioContext自定义事件源的操作接口是aio_source_funcs()1
2
3
4
5
6static GSourceFuncs aio_source_funcs = {
aio_ctx_prepare,
aio_ctx_check,
aio_ctx_dispatch,
aio_ctx_finalize
};
这里我们分析一下aio_ctx_prepare()、aio_ctx_check和aio_ctx_dispatch(),来更好的理解qemu的事件循环流程
aio_ctx_prepare
1 | static gboolean |
可以看到,如果没有即时事件,则设置poll为QEMUBH和定时器等的最小超时时间即可
aio_ctx_check
1 | static gboolean |
可以看到,其检查了前面AioContext中关注的资源使用可用,即aio_handlers对应的文件描述符资源、bh_list对应的QEMUBH资源和tlg对应的定时器资源
aio_ctx_dispatch
1 | static gboolean |
可以看到,其会依次调用可用文件描述符资源、QEMUBH资源和定时器资源的回调函数
事件循环
由于Qemu自定义的事件源AioContext比较复杂,因此Qemu并没有直接使用glib的g_main_loop_run()接口进行事件循环,而是
使用自定义的qemu_main_loop(),如下所示
1 | //#0 qemu_main_loop () at ../system/runstate.c:779 |
可以看到,类似于前面GMainContext的一轮事件循环,Qemu在os_host_main_loop_wait()中完成一轮事件循环。
具体的,Qemu在glib_pollfds_fill()中获取poll的超时时间和文件描述符,然后在qemu_poll_ns()进行poll,并在glib_pollfds_poll中对可用资源进行处理
前面介绍了AioContext事件源关心三类资源,这里具体分析一下这三类资源是如何完成事件循环的
AioHandler
Qemu使用aio_set_fd_handler()向AioContext中添加资源,如下所示
1 | void aio_set_fd_handler(AioContext *ctx, |
可以看到,Qemu会创建对应的AioHandler并插入到ctx->aio_handlers链表中,并将文件描述符使用g_source_add_poll()绑定到AioContext自定义事件源
根据前面的分析,在事件循环中,glib_pollfds_fill()会通过g_main_context_query()将该绑定的文件描述符填充到poll数组,并在qemu_poll_ns中进行poll操作,最后在glib_pollfds_poll()中检查ctx->aio_handlers并完成回调函数的相关处理
这里特别分析一下最后调用的aio_notify(),如下所示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38void aio_notify(AioContext *ctx)
{
/*
* Write e.g. ctx->bh_list before writing ctx->notified. Pairs with
* smp_mb() in aio_notify_accept().
*/
smp_wmb();
qatomic_set(&ctx->notified, true);
/*
* Write ctx->notified (and also ctx->bh_list) before reading ctx->notify_me.
* Pairs with smp_mb() in aio_ctx_prepare or aio_poll.
*/
smp_mb();
if (qatomic_read(&ctx->notify_me)) {
event_notifier_set(&ctx->notifier);
}
}
int event_notifier_set(EventNotifier *e)
{
static const uint64_t value = 1;
ssize_t ret;
if (!e->initialized) {
return -1;
}
do {
ret = write(e->wfd, &value, sizeof(value));
} while (ret < 0 && errno == EINTR);
/* EAGAIN is fine, a read must be pending. */
if (ret < 0 && errno != EAGAIN) {
return -errno;
}
return 0;
}
其主要逻辑就是向AioContext的notifier字段写入数据。而notifier是eventfd系统调用的包装,AioContext用其来立即退出poll并重新进入新的事件循环,避免在glib_pollfds_fill()之后添加的资源一直等不到处理,如下所示1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43AioContext *aio_context_new(Error **errp)
{
int ret;
AioContext *ctx;
ctx = (AioContext *) g_source_new(&aio_source_funcs, sizeof(AioContext));
...
ret = event_notifier_init(&ctx->notifier, false);
if (ret < 0) {
error_setg_errno(errp, -ret, "Failed to initialize event notifier");
goto fail;
}
...
aio_set_event_notifier(ctx, &ctx->notifier,
aio_context_notifier_cb,
aio_context_notifier_poll,
aio_context_notifier_poll_ready);
...
}
int event_notifier_init(EventNotifier *e, int active)
{
int fds[2];
int ret;
ret = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
...
e->rfd = e->wfd = ret;
e->initialized = true;
...
return 0;
}
void aio_set_event_notifier(AioContext *ctx,
EventNotifier *notifier,
EventNotifierHandler *io_read,
AioPollFn *io_poll,
EventNotifierHandler *io_poll_ready)
{
aio_set_fd_handler(ctx, event_notifier_get_fd(notifier),
(IOHandler *)io_read, NULL, io_poll,
(IOHandler *)io_poll_ready, notifier);
}
具体的,在创建AioContext时,其会通过eventfd系统调用创建notifier字段数据结构,并将notifier的文件描述符通过刚刚分析的aio_set_fd_handler()添加到该AioContext的关心资源。之后,每当有线程调用aio_notify(),添加的文件描述符就有可读资源,从而立即从poll中退出。
QEMUBH
Qemu使用aio_bh_enqueue()即向AioContext中添加资源,又用来产生事件,如下所示
1 | /* Called concurrently from any thread */ |
可以看到,其将QEMUBH插入到ctx->bh_list链表中然后调用aio_notify()。基于前面对aio_notify的介绍,其会立即退出poll,然后根据前面aio_ctx_check和前面aio_ctx_dispatch查看QEMUBH的flags然后进行相应的操作。
即如果aio_bh_enqueue()的new_flags字段包含BH_SCHEDULED,会使QEMUBH资源可用并在时间循环中迅速被处理
QEMUTimer
Qemu使用timer_mod_ns_locked()来向AioContext中添加资源,如下所示
1 | static bool timer_mod_ns_locked(QEMUTimerList *timer_list, |
可以看到,其设置QEMUTimer的超时时间后,添加到ctx->tlg中的相关链表中,会在expire_time后产生事件。
而根据前面aio_ctx_prepare可知,其设置的poll时间不大于所有的QEMUTimer超时时间,从而确保在glib_pollfds_poll()中能按时的处理事件
libvirt
libvirt项目比较复杂,并且相关的资料比较少,所以分析起来相对比较困难。
这里首先给出libvirt的整个框架,如下所示
可以看到,整个libvirt的核心就是libvirtd,其核心就是事件循环和线程池,如下所示
事件循环
libvirt直接使用了glib的事件循环机制,如virNetDaemonRun()所示
1 | void |
根据相关注释,libvirt使用virEventAddHandle()和virEventAddTimeout()添加关注资源。
文件描述符事件源
其中virEventAddHandle()绑定virEventGLibFDSourceFuncs事件源来关注文件描述符类型资源
1 | /** |
可以看到,virEventAddHandle()最后调用glib的g_source_attach()等添加自定义事件源。
其中,event参数可以用来指定哪些文件描述符事件可以触发事件循环,例如当文件描述符有可读数据或是文件描述符可写等,如virEventGLibEventsToCondition()中所示,其可以通过virEventUpdateHandle()进行更改
1 | static GIOCondition |
定时器事件源
而virEventAddTimeout()会绑定定时器事件源来添加定时器资源
1 | /** |
可以看到,libvirt最后使用glib的g_timeout_source_new()创建定时器事件源并使用g_source_attach()进行绑定
线程池
为了避免事件循环被阻塞在dispatch阶段,libvirt会将事件处理的逻辑offload到worker线程,而事件循环线程只用来唤醒worker线程,其大体的框架如下所示
- virsh调用Libvirt API
- remote驱动将API编码成RPC消息,并发送到libvirtd
- libvirtd的事件循环poll发现关注资源可用
- 事件循环唤醒worker线程,worker线程处理RPC消息
- worker线程将处理结果返回给virsh客户端
virThreadPoolNewFull()
而libvirt使用virThreadPoolNewFull()创建线程池
1 | virThreadPool * |
可以看到,libvirt会在virThreadCreateFull()创建执行virThreadPoolWorker()逻辑的线程,而virThreadPoolWorker()逻辑就是依次执行jobList的job
而libvirt会使用virThreadPoolSendJob()添加job并唤醒线程池
1 | int virThreadPoolSendJob(virThreadPool *pool, |
样例
这里就以libvirtd处理RPC消息作为样例来分析libvirt的事件循环
libvirtd会在virNetServerNew()中创建RPC的线程池
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244virNetServer *
virNetServerNew(const char *name,
unsigned long long next_client_id,
size_t min_workers,
size_t max_workers,
size_t priority_workers,
size_t max_clients,
size_t max_anonymous_clients,
int keepaliveInterval,
unsigned int keepaliveCount,
virNetServerClientPrivNew clientPrivNew,
virNetServerClientPrivPreExecRestart clientPrivPreExecRestart,
virFreeCallback clientPrivFree,
void *clientPrivOpaque)
{
g_autoptr(virNetServer) srv = NULL;
g_autofree char *jobName = g_strdup_printf("rpc-%s", name);
if (max_clients < max_anonymous_clients) {
virReportError(VIR_ERR_INTERNAL_ERROR, "%s",
_("The overall maximum number of clients must not be less than the number of clients waiting for authentication"));
return NULL;
}
if (virNetServerInitialize() < 0)
return NULL;
if (!(srv = virObjectLockableNew(virNetServerClass)))
return NULL;
if (!(srv->workers = virThreadPoolNewFull(min_workers, max_workers,
priority_workers,
virNetServerHandleJob,
jobName,
NULL,
srv)))
return NULL;
srv->name = g_strdup(name);
srv->next_client_id = next_client_id;
srv->nclients_max = max_clients;
srv->nclients_unauth_max = max_anonymous_clients;
srv->keepaliveInterval = keepaliveInterval;
srv->keepaliveCount = keepaliveCount;
srv->clientPrivNew = clientPrivNew;
srv->clientPrivPreExecRestart = clientPrivPreExecRestart;
srv->clientPrivFree = clientPrivFree;
srv->clientPrivOpaque = clientPrivOpaque;
return g_steal_pointer(&srv);
}
static void
virNetServerHandleJob(void *jobOpaque,
void *opaque)
{
virNetServer *srv = opaque;
virNetServerJob *job = jobOpaque;
...
if (virNetServerProcessMsg(srv, job->client, job->prog, job->msg) < 0)
goto error;
virObjectUnref(job->prog);
virObjectUnref(job->client);
VIR_FREE(job);
return;
...
}
static int
virNetServerProcessMsg(virNetServer *srv,
virNetServerClient *client,
virNetServerProgram *prog,
virNetMessage *msg)
{
...
if (virNetServerProgramDispatch(prog,
srv,
client,
msg) < 0)
return -1;
return 0;
}
/*
* @server: the unlocked server object
* @client: the unlocked client object
* @msg: the complete incoming message packet, with header already decoded
*
* This function is intended to be called from worker threads
* when an incoming message is ready to be dispatched for
* execution.
*
* Upon successful return the '@msg' instance will be released
* by this function (or more often, reused to send a reply).
* Upon failure, the '@msg' must be freed by the caller.
*
* Returns 0 if the message was dispatched, -1 upon fatal error
*/
int virNetServerProgramDispatch(virNetServerProgram *prog,
virNetServer *server,
virNetServerClient *client,
virNetMessage *msg)
{
...
switch (msg->header.type) {
case VIR_NET_CALL:
case VIR_NET_CALL_WITH_FDS:
ret = virNetServerProgramDispatchCall(prog, server, client, msg);
break;
...
}
return ret;
...
}
/*
* @server: the unlocked server object
* @client: the unlocked client object
* @msg: the complete incoming method call, with header already decoded
*
* This method is used to dispatch a message representing an
* incoming method call from a client. It decodes the payload
* to obtain method call arguments, invokes the method and
* then sends a reply packet with the return values
*
* Returns 0 if the reply was sent, or -1 upon fatal error
*/
static int
virNetServerProgramDispatchCall(virNetServerProgram *prog,
virNetServer *server,
virNetServerClient *client,
virNetMessage *msg)
{
g_autofree char *arg = NULL;
g_autofree char *ret = NULL;
int rv = -1;
virNetServerProgramProc *dispatcher = NULL;
virNetMessageError rerr = { 0 };
size_t i;
g_autoptr(virIdentity) identity = NULL;
if (msg->header.status != VIR_NET_OK) {
virReportError(VIR_ERR_RPC,
_("Unexpected message status %1$u"),
msg->header.status);
goto error;
}
dispatcher = virNetServerProgramGetProc(prog, msg->header.proc);
if (!dispatcher) {
virReportError(VIR_ERR_RPC,
_("unknown procedure: %1$d"),
msg->header.proc);
goto error;
}
/* If the client is not authenticated, don't allow any RPC ops
* which are except for authentication ones */
if (dispatcher->needAuth &&
!virNetServerClientIsAuthenticated(client)) {
/* Explicitly *NOT* calling remoteDispatchAuthError() because
we want back-compatibility with libvirt clients which don't
support the VIR_ERR_AUTH_FAILED error code */
virReportError(VIR_ERR_RPC,
"%s", _("authentication required"));
goto error;
}
arg = g_new0(char, dispatcher->arg_len);
ret = g_new0(char, dispatcher->ret_len);
if (virNetMessageDecodePayload(msg, dispatcher->arg_filter, arg) < 0)
goto error;
if (!(identity = virNetServerClientGetIdentity(client)))
goto error;
if (virIdentitySetCurrent(identity) < 0)
goto error;
/*
* When the RPC handler is called:
*
* - Server object is unlocked
* - Client object is unlocked
*
* Without locking, it is safe to use:
*
* 'args and 'ret'
*/
rv = (dispatcher->func)(server, client, msg, &rerr, arg, ret);
if (virIdentitySetCurrent(NULL) < 0)
goto error;
/*
* If rv == 1, this indicates the dispatch func has
* populated 'msg' with a list of FDs to return to
* the caller.
*
* Otherwise we must clear out the FDs we got from
* the client originally.
*
*/
if (rv != 1) {
for (i = 0; i < msg->nfds; i++)
VIR_FORCE_CLOSE(msg->fds[i]);
VIR_FREE(msg->fds);
msg->nfds = 0;
}
if (rv < 0)
goto error;
/* Return header. We're re-using same message object, so
* only need to tweak type/status fields */
/*msg->header.prog = msg->header.prog;*/
/*msg->header.vers = msg->header.vers;*/
/*msg->header.proc = msg->header.proc;*/
msg->header.type = msg->nfds ? VIR_NET_REPLY_WITH_FDS : VIR_NET_REPLY;
/*msg->header.serial = msg->header.serial;*/
msg->header.status = VIR_NET_OK;
if (virNetMessageEncodeHeader(msg) < 0)
goto error;
if (msg->nfds &&
virNetMessageEncodeNumFDs(msg) < 0)
goto error;
if (virNetMessageEncodePayload(msg, dispatcher->ret_filter, ret) < 0)
goto error;
xdr_free(dispatcher->arg_filter, arg);
xdr_free(dispatcher->ret_filter, ret);
/* Put reply on end of tx queue to send out */
return virNetServerClientSendMessage(client, msg);
}每当有job时,线程池中的线程会执行virNetServerHandleJob()逻辑处理job。该逻辑最终会在virNetServerProgramDispatchCall()中处理RPC消息,并将返回结果压到tx队列中
在完成线程池的创建后,libvirtd在daemonSetupNetworking()中绑定监听socket资源,从而当资源可用时唤醒线程池处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265static int ATTRIBUTE_NONNULL(3)
daemonSetupNetworking(virNetServer *srv,
virNetServer *srvAdm,
struct daemonConfig *config,
const char *sock_path,
const char *sock_path_ro,
const char *sock_path_adm)
{
gid_t unix_sock_gid = 0;
int unix_sock_ro_mask = 0;
int unix_sock_rw_mask = 0;
int unix_sock_adm_mask = 0;
g_autoptr(virSystemdActivation) act = NULL;
if (virSystemdGetActivation(&act) < 0)
return -1;
if (config->unix_sock_group) {
if (virGetGroupID(config->unix_sock_group, &unix_sock_gid) < 0)
return -1;
}
if (virStrToLong_i(config->unix_sock_ro_perms, NULL, 8, &unix_sock_ro_mask) != 0) {
VIR_ERROR(_("Failed to parse mode '%1$s'"), config->unix_sock_ro_perms);
return -1;
}
if (virStrToLong_i(config->unix_sock_admin_perms, NULL, 8, &unix_sock_adm_mask) != 0) {
VIR_ERROR(_("Failed to parse mode '%1$s'"), config->unix_sock_admin_perms);
return -1;
}
if (virStrToLong_i(config->unix_sock_rw_perms, NULL, 8, &unix_sock_rw_mask) != 0) {
VIR_ERROR(_("Failed to parse mode '%1$s'"), config->unix_sock_rw_perms);
return -1;
}
if (virNetServerAddServiceUNIX(srv,
act,
DAEMON_NAME ".socket",
sock_path,
unix_sock_rw_mask,
unix_sock_gid,
config->auth_unix_rw,
NULL,
false,
config->max_queued_clients,
config->max_client_requests) < 0)
return -1;
if (sock_path_ro &&
virNetServerAddServiceUNIX(srv,
act,
DAEMON_NAME "-ro.socket",
sock_path_ro,
unix_sock_ro_mask,
unix_sock_gid,
config->auth_unix_ro,
NULL,
true,
config->max_queued_clients,
config->max_client_requests) < 0)
return -1;
if (sock_path_adm &&
virNetServerAddServiceUNIX(srvAdm,
act,
DAEMON_NAME "-admin.socket",
sock_path_adm,
unix_sock_adm_mask,
unix_sock_gid,
REMOTE_AUTH_NONE,
NULL,
false,
config->admin_max_queued_clients,
config->admin_max_client_requests) < 0)
return -1;
if (act &&
virSystemdActivationComplete(act) < 0)
return -1;
return 0;
}
int
virNetServerAddServiceUNIX(virNetServer *srv,
virSystemdActivation *act,
const char *actname,
const char *path,
mode_t mask,
gid_t grp,
int auth,
virNetTLSContext *tls,
bool readonly,
size_t max_queued_clients,
size_t nrequests_client_max)
{
virNetServerService *svc = NULL;
int ret;
...
if (!(svc = virNetServerServiceNewUNIX(path,
mask,
grp,
auth,
tls,
readonly,
max_queued_clients,
nrequests_client_max)))
return -1;
virNetServerAddService(srv, svc);
virObjectUnref(svc);
return 0;
}
virNetServerService *virNetServerServiceNewUNIX(const char *path,
mode_t mask,
gid_t grp,
int auth,
virNetTLSContext *tls,
bool readonly,
size_t max_queued_clients,
size_t nrequests_client_max)
{
virNetServerService *svc;
virNetSocket *sock;
VIR_DEBUG("Creating new UNIX server path='%s' mask=%o gid=%u",
path, mask, grp);
if (virNetSocketNewListenUNIX(path,
mask,
-1,
grp,
&sock) < 0)
return NULL;
svc = virNetServerServiceNewSocket(&sock,
1,
auth,
tls,
readonly,
max_queued_clients,
nrequests_client_max);
virObjectUnref(sock);
return svc;
}
static virNetServerService *
virNetServerServiceNewSocket(virNetSocket **socks,
size_t nsocks,
int auth,
virNetTLSContext *tls,
bool readonly,
size_t max_queued_clients,
size_t nrequests_client_max)
{
virNetServerService *svc;
size_t i;
if (virNetServerServiceInitialize() < 0)
return NULL;
if (!(svc = virObjectNew(virNetServerServiceClass)))
return NULL;
svc->socks = g_new0(virNetSocket *, nsocks);
svc->nsocks = nsocks;
for (i = 0; i < svc->nsocks; i++) {
svc->socks[i] = socks[i];
virObjectRef(svc->socks[i]);
}
svc->auth = auth;
svc->readonly = readonly;
svc->nrequests_client_max = nrequests_client_max;
svc->tls = virObjectRef(tls);
virObjectRef(svc);
svc->timer = virEventAddTimeout(-1, virNetServerServiceTimerFunc,
svc, virObjectUnref);
if (svc->timer < 0) {
virObjectUnref(svc);
goto error;
}
for (i = 0; i < svc->nsocks; i++) {
if (virNetSocketListen(svc->socks[i], max_queued_clients) < 0)
goto error;
/* IO callback is initially disabled, until we're ready
* to deal with incoming clients */
virObjectRef(svc);
if (virNetSocketAddIOCallback(svc->socks[i],
0,
virNetServerServiceAccept,
svc,
virObjectUnref) < 0) {
virObjectUnref(svc);
goto error;
}
}
return svc;
error:
virObjectUnref(svc);
return NULL;
}
int virNetSocketAddIOCallback(virNetSocket *sock,
int events,
virNetSocketIOFunc func,
void *opaque,
virFreeCallback ff)
{
int ret = -1;
virObjectRef(sock);
virObjectLock(sock);
if (sock->watch >= 0) {
VIR_DEBUG("Watch already registered on socket %p", sock);
goto cleanup;
}
if ((sock->watch = virEventAddHandle(sock->fd,
events,
virNetSocketEventHandle,
sock,
virNetSocketEventFree)) < 0) {
VIR_DEBUG("Failed to register watch on socket %p", sock);
goto cleanup;
}
sock->func = func;
sock->opaque = opaque;
sock->ff = ff;
ret = 0;
cleanup:
virObjectUnlock(sock);
if (ret != 0)
virObjectUnref(sock);
return ret;
}
static void virNetSocketEventHandle(int watch G_GNUC_UNUSED,
int fd G_GNUC_UNUSED,
int events,
void *opaque)
{
virNetSocket *sock = opaque;
virNetSocketIOFunc func;
void *eopaque;
virObjectLock(sock);
func = sock->func;
eopaque = sock->opaque;
virObjectUnlock(sock);
if (func)
func(sock, events, eopaque);
}简单整理可知,libvirtd在virNetServerServiceNewSocket()中使用virNetSocketAddIOCallback()完成监听socket资源的绑定,即根据前面libvirt文件描述符事件源小节,其调用virEventAddHandle()来实现资源的绑定
- 当virsh连接监听socket时,该socket资源可用,libvirtd的事件循环会捕获到该事件,并调用注册的virNetSocketEventHandle()handler来处理事件。该handler会回调在virNetSocketAddIOCallback()中注册的回调函数virNetServerServiceAccept()。该回调函数会调用server在virNetServerAddService()中初始化时注册的virNetServerDispatchNewClient()的dispatch函数在回调函数virNetServerDispatchNewClient()中,libvirt创建此次连接的socket,并在virNetServerAddClient()将该socket资源绑定到事件循环中。具体的,根据前面libvirt文件描述符事件源小节,其在virNetServerClientRegisterEvent()中调用virEventAddHandle()来实现资源的绑定
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194//#0 virNetServerDispatchNewClient (svc=0x555555659810, clientsock=0x55555565a3b0, opaque=0x55555564c880) at ../src/rpc/virnetserver.c:331
//#1 0x00007ffff7c01522 in virNetServerServiceAccept (sock=0x55555565a050, events=<optimized out>, opaque=0x555555659810) at ../src/rpc/virnetserverservice.c:102
//#2 0x00007ffff7aed46e in virEventGLibHandleDispatch (fd=<optimized out>, condition=<optimized out>, opaque=0x555555659c80) at ../src/util/vireventglib.c:119
//#3 0x00007ffff7eb9c44 in g_main_context_dispatch () at /lib/x86_64-linux-gnu/libglib-2.0.so.0
//#4 0x00007ffff7f0f2b8 in () at /lib/x86_64-linux-gnu/libglib-2.0.so.0
//#5 0x00007ffff7eb73e3 in g_main_context_iteration () at /lib/x86_64-linux-gnu/libglib-2.0.so.0
//#6 0x00007ffff7aee074 in virEventGLibRunOnce () at ../src/util/vireventglib.c:515
//#7 0x00007ffff7b4d349 in virEventRunDefaultImpl () at ../src/util/virevent.c:362
//#8 0x00007ffff7c05ed3 in virNetDaemonRun (dmn=0x55555564b870, dmn@entry=0x55555564c960) at ../src/rpc/virnetdaemon.c:837
//#9 0x000055555558d952 in main (argc=argc@entry=1, argv=argv@entry=0x7fffffffdf58) at ../src/remote/remote_daemon.c:1214
//#10 0x00007ffff7429d90 in __libc_start_call_main (main=main@entry=0x55555558c700 <main>, argc=argc@entry=1, argv=argv@entry=0x7fffffffdf58) at ../sysdeps/nptl/libc_start_call_main.h:58
//#11 0x00007ffff7429e40 in __libc_start_main_impl (main=0x55555558c700 <main>, argc=1, argv=0x7fffffffdf58, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7fffffffdf48) at ../csu/libc-start.c:392
//#12 0x000055555558e2b5 in _start ()
static void virNetSocketEventHandle(int watch G_GNUC_UNUSED,
int fd G_GNUC_UNUSED,
int events,
void *opaque)
{
virNetSocket *sock = opaque;
virNetSocketIOFunc func;
void *eopaque;
virObjectLock(sock);
func = sock->func;
eopaque = sock->opaque;
virObjectUnlock(sock);
if (func)
func(sock, events, eopaque);
}
static void virNetServerServiceAccept(virNetSocket *sock,
int events G_GNUC_UNUSED,
void *opaque)
{
virNetServerService *svc = opaque;
virNetSocket *clientsock = NULL;
int rc;
rc = virNetSocketAccept(sock, &clientsock);
if (rc < 0) {
if (rc == -2) {
/* Could not accept new client due to EMFILE. Suspend listening on
* the socket and set up a timer to enable it later. Hopefully,
* some FDs will be closed meanwhile. */
VIR_DEBUG("Temporarily suspending listening on svc=%p because accept() on sock=%p failed (errno=%d)",
svc, sock, errno);
virNetServerServiceToggle(svc, false);
svc->timerActive = true;
/* Retry in 5 seconds. */
virEventUpdateTimeout(svc->timer, 5 * 1000);
}
goto cleanup;
}
if (!clientsock) /* Connection already went away */
goto cleanup;
if (!svc->dispatchFunc)
goto cleanup;
svc->dispatchFunc(svc, clientsock, svc->dispatchOpaque);
cleanup:
virObjectUnref(clientsock);
}
static int
virNetServerDispatchNewClient(virNetServerService *svc,
virNetSocket *clientsock,
void *opaque)
{
virNetServer *srv = opaque;
g_autoptr(virNetServerClient) client = NULL;
if (!(client = virNetServerClientNew(virNetServerNextClientID(srv),
clientsock,
virNetServerServiceGetAuth(svc),
virNetServerServiceIsReadonly(svc),
virNetServerServiceGetMaxRequests(svc),
virNetServerServiceGetTLSContext(svc),
srv->clientPrivNew,
srv->clientPrivPreExecRestart,
srv->clientPrivFree,
srv->clientPrivOpaque)))
return -1;
if (virNetServerAddClient(srv, client) < 0) {
virNetServerClientClose(client);
return -1;
}
return 0;
}
int
virNetServerAddClient(virNetServer *srv,
virNetServerClient *client)
{
VIR_LOCK_GUARD lock = virObjectLockGuard(srv);
if (virNetServerClientInit(client) < 0)
return -1;
VIR_EXPAND_N(srv->clients, srv->nclients, 1);
srv->clients[srv->nclients-1] = virObjectRef(client);
VIR_WITH_OBJECT_LOCK_GUARD(client) {
if (virNetServerClientIsAuthPendingLocked(client))
virNetServerTrackPendingAuthLocked(srv);
}
virNetServerCheckLimits(srv);
virNetServerClientSetDispatcher(client, virNetServerDispatchNewMessage, srv);
if (virNetServerClientInitKeepAlive(client, srv->keepaliveInterval,
srv->keepaliveCount) < 0)
return -1;
return 0;
}
int virNetServerClientInit(virNetServerClient *client)
{
VIR_LOCK_GUARD lock = virObjectLockGuard(client);
int ret = -1;
if (!client->tlsCtxt) {
/* Plain socket, so prepare to read first message */
if (virNetServerClientRegisterEvent(client) < 0)
goto error;
return 0;
}
if (!(client->tls = virNetTLSSessionNew(client->tlsCtxt, NULL)))
goto error;
virNetSocketSetTLSSession(client->sock, client->tls);
/* Begin the TLS handshake. */
VIR_WITH_OBJECT_LOCK_GUARD(client->tlsCtxt) {
ret = virNetTLSSessionHandshake(client->tls);
}
if (ret == 0) {
/* Unlikely, but ... Next step is to check the certificate. */
if (virNetServerClientCheckAccess(client) < 0)
goto error;
/* Handshake & cert check OK, so prepare to read first message */
if (virNetServerClientRegisterEvent(client) < 0)
goto error;
} else if (ret > 0) {
/* Most likely, need to do more handshake data */
if (virNetServerClientRegisterEvent(client) < 0)
goto error;
} else {
goto error;
}
return 0;
error:
client->wantClose = true;
return -1;
}
/*
* @server: a locked or unlocked server object
* @client: a locked client object
*/
static int virNetServerClientRegisterEvent(virNetServerClient *client)
{
int mode = virNetServerClientCalculateHandleMode(client);
if (!client->sock)
return -1;
virObjectRef(client);
VIR_DEBUG("Registering client event callback %d", mode);
if (virNetSocketAddIOCallback(client->sock,
mode,
virNetServerClientDispatchEvent,
client,
virObjectUnref) < 0) {
virObjectUnref(client);
return -1;
}
return 0;
} - 当virsh通过该socket发送RPC消息时,该socket可用,libvirtd的事件循环会捕获到该事件,并调用注册的virNetServerClientDispatchEvent()handler来处理该事件。该回调函数会调用在virNetServerAddClient()中初始化时设置的virNetServerDispatchNewMessage()的dispatch函数。该函数会通过virThreadPoolSendJob()唤醒RPC线程池进行处理,最终完成RPC消息的处理
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117//#0 virNetServerDispatchNewMessage (client=0x555555660050, msg=0x55555564ad80, opaque=0x55555564c880) at ../src/rpc/virnetserver.c:198
//#1 0x00007ffff7aed46e in virEventGLibHandleDispatch (fd=<optimized out>, condition=<optimized out>, opaque=0x5555556431b0) at ../src/util/vireventglib.c:119
//#2 0x00007ffff7eb9c44 in g_main_context_dispatch () at /lib/x86_64-linux-gnu/libglib-2.0.so.0
//#3 0x00007ffff7f0f2b8 in () at /lib/x86_64-linux-gnu/libglib-2.0.so.0
//#4 0x00007ffff7eb73e3 in g_main_context_iteration () at /lib/x86_64-linux-gnu/libglib-2.0.so.0
//#5 0x00007ffff7aee074 in virEventGLibRunOnce () at ../src/util/vireventglib.c:515
//#6 0x00007ffff7b4d349 in virEventRunDefaultImpl () at ../src/util/virevent.c:362
//#7 0x00007ffff7c05ed3 in virNetDaemonRun (dmn=0x55555564b870, dmn@entry=0x55555564c960) at ../src/rpc/virnetdaemon.c:837
//#8 0x000055555558d952 in main (argc=argc@entry=1, argv=argv@entry=0x7fffffffdf58) at ../src/remote/remote_daemon.c:1214
//#9 0x00007ffff7429d90 in __libc_start_call_main (main=main@entry=0x55555558c700 <main>, argc=argc@entry=1, argv=argv@entry=0x7fffffffdf58) at ../sysdeps/nptl/libc_start_call_main.h:58
//#10 0x00007ffff7429e40 in __libc_start_main_impl (main=0x55555558c700 <main>, argc=1, argv=0x7fffffffdf58, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7fffffffdf48) at ../csu/libc-start.c:392
//#11 0x000055555558e2b5 in _start ()
static void
virNetServerClientDispatchEvent(virNetSocket *sock, int events, void *opaque)
{
virNetServerClient *client = opaque;
virNetMessage *msg = NULL;
VIR_WITH_OBJECT_LOCK_GUARD(client) {
if (client->sock != sock) {
virNetSocketRemoveIOCallback(sock);
return;
}
if (events & (VIR_EVENT_HANDLE_WRITABLE | VIR_EVENT_HANDLE_READABLE)) {
if (client->tls &&
virNetTLSSessionGetHandshakeStatus(client->tls) !=
VIR_NET_TLS_HANDSHAKE_COMPLETE) {
virNetServerClientDispatchHandshake(client);
} else {
if (events & VIR_EVENT_HANDLE_WRITABLE)
virNetServerClientDispatchWrite(client);
if ((events & VIR_EVENT_HANDLE_READABLE) && client->rx)
msg = virNetServerClientDispatchRead(client);
}
}
/* NB, will get HANGUP + READABLE at same time upon disconnect */
if (events & (VIR_EVENT_HANDLE_ERROR | VIR_EVENT_HANDLE_HANGUP))
client->wantClose = true;
}
if (msg)
virNetServerClientDispatchMessage(client, msg);
}
static void virNetServerClientDispatchMessage(virNetServerClient *client,
virNetMessage *msg)
{
VIR_WITH_OBJECT_LOCK_GUARD(client) {
if (!client->dispatchFunc) {
virNetMessageFree(msg);
client->wantClose = true;
return;
}
}
/* Accessing 'client' is safe, because virNetServerClientSetDispatcher
* only permits setting 'dispatchFunc' once, so if non-NULL, it will
* never change again
*/
client->dispatchFunc(client, msg, client->dispatchOpaque);
}
static void
virNetServerDispatchNewMessage(virNetServerClient *client,
virNetMessage *msg,
void *opaque)
{
virNetServer *srv = opaque;
virNetServerProgram *prog = NULL;
unsigned int priority = 0;
VIR_DEBUG("server=%p client=%p message=%p",
srv, client, msg);
VIR_WITH_OBJECT_LOCK_GUARD(srv) {
prog = virNetServerGetProgramLocked(srv, msg);
/* we can unlock @srv since @prog can only become invalid in case
* of disposing @srv, but let's grab a ref first to ensure nothing
* disposes of it before we use it. */
virObjectRef(srv);
}
if (virThreadPoolGetMaxWorkers(srv->workers) > 0) {
virNetServerJob *job;
job = g_new0(virNetServerJob, 1);
job->client = virObjectRef(client);
job->msg = msg;
if (prog) {
job->prog = virObjectRef(prog);
priority = virNetServerProgramGetPriority(prog, msg->header.proc);
}
if (virThreadPoolSendJob(srv->workers, priority, job) < 0) {
virObjectUnref(client);
VIR_FREE(job);
virObjectUnref(prog);
goto error;
}
} else {
if (virNetServerProcessMsg(srv, client, prog, msg) < 0)
goto error;
}
virObjectUnref(srv);
return;
error:
virNetMessageFree(msg);
virNetServerClientClose(client);
virObjectUnref(srv);
} - 根据前面分析,当RPC线程池完成RPC消息处理后,会使用virNetServerClientSendMessage()将结果返回RPC线程池在virNetServerClientSendMessageLocked()中将RPC返回消息压入client->tx中,且使用virNetServerClientUpdateEvent()添加VIR_EVENT_HANDLE_WRITEABLE标志到文件描述符事件源,根据前面libvirt文件描述符事件源小节可知,当文件描述符可写时也会触发文件描述符事件源的事件循环,从而最终在virNetServerClientDispatchEvent()中调用virNetServerClientDispatchWrite()将RPC结果发送给virsh
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98int virNetServerClientSendMessage(virNetServerClient *client,
virNetMessage *msg)
{
VIR_LOCK_GUARD lock = virObjectLockGuard(client);
return virNetServerClientSendMessageLocked(client, msg);
}
static int
virNetServerClientSendMessageLocked(virNetServerClient *client,
virNetMessage *msg)
{
int ret = -1;
VIR_DEBUG("msg=%p proc=%d len=%zu offset=%zu",
msg, msg->header.proc,
msg->bufferLength, msg->bufferOffset);
msg->donefds = 0;
if (client->sock && !client->wantClose) {
PROBE(RPC_SERVER_CLIENT_MSG_TX_QUEUE,
"client=%p len=%zu prog=%u vers=%u proc=%u type=%u status=%u serial=%u",
client, msg->bufferLength,
msg->header.prog, msg->header.vers, msg->header.proc,
msg->header.type, msg->header.status, msg->header.serial);
virNetMessageQueuePush(&client->tx, msg);
virNetServerClientUpdateEvent(client);
ret = 0;
}
return ret;
}
/*
* @client: a locked client object
*/
static void virNetServerClientUpdateEvent(virNetServerClient *client)
{
int mode;
if (!client->sock)
return;
mode = virNetServerClientCalculateHandleMode(client);
virNetSocketUpdateIOCallback(client->sock, mode);
if (client->rx && virNetSocketHasCachedData(client->sock))
virEventUpdateTimeout(client->sockTimer, 0);
}
/*
* @client: a locked client object
*/
static int
virNetServerClientCalculateHandleMode(virNetServerClient *client)
{
int mode = 0;
VIR_DEBUG("tls=%p hs=%d, rx=%p tx=%p",
client->tls,
client->tls ? virNetTLSSessionGetHandshakeStatus(client->tls) : -1,
client->rx,
client->tx);
if (!client->sock || client->wantClose)
return 0;
if (client->tls) {
switch (virNetTLSSessionGetHandshakeStatus(client->tls)) {
case VIR_NET_TLS_HANDSHAKE_RECVING:
mode |= VIR_EVENT_HANDLE_READABLE;
break;
case VIR_NET_TLS_HANDSHAKE_SENDING:
mode |= VIR_EVENT_HANDLE_WRITABLE;
break;
default:
case VIR_NET_TLS_HANDSHAKE_COMPLETE:
if (client->rx)
mode |= VIR_EVENT_HANDLE_READABLE;
if (client->tx)
mode |= VIR_EVENT_HANDLE_WRITABLE;
}
} else {
/* If there is a message on the rx queue, and
* we're not in middle of a delayedClose, then
* we're wanting more input */
if (client->rx && !client->delayedClose)
mode |= VIR_EVENT_HANDLE_READABLE;
/* If there are one or more messages to send back to client,
then monitor for writability on socket */
if (client->tx)
mode |= VIR_EVENT_HANDLE_WRITABLE;
}
VIR_DEBUG("mode=0%o", mode);
return mode;
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118//#0 0x00007ffff7c022f3 in virNetServerClientDispatchWrite (client=<optimized out>) at ../src/rpc/virnetserverclient.c:1356
//#1 virNetServerClientDispatchEvent (sock=<optimized out>, events=1, opaque=0x555555660120) at ../src/rpc/virnetserverclient.c:1430
//#2 0x00007ffff7aed46e in virEventGLibHandleDispatch (fd=<optimized out>, condition=<optimized out>, opaque=0x555555663490) at ../src/util/vireventglib.c:119
//#3 0x00007ffff7eb9c44 in g_main_context_dispatch () at /lib/x86_64-linux-gnu/libglib-2.0.so.0
//#4 0x00007ffff7f0f2b8 in () at /lib/x86_64-linux-gnu/libglib-2.0.so.0
//#5 0x00007ffff7eb73e3 in g_main_context_iteration () at /lib/x86_64-linux-gnu/libglib-2.0.so.0
//#6 0x00007ffff7aee074 in virEventGLibRunOnce () at ../src/util/vireventglib.c:515
//#7 0x00007ffff7b4d349 in virEventRunDefaultImpl () at ../src/util/virevent.c:362
//#8 0x00007ffff7c05ed3 in virNetDaemonRun (dmn=0x55555564b870, dmn@entry=0x55555564c960) at ../src/rpc/virnetdaemon.c:837
//#9 0x000055555558d952 in main (argc=argc@entry=1, argv=argv@entry=0x7fffffffdf58) at ../src/remote/remote_daemon.c:1214
//#10 0x00007ffff7429d90 in __libc_start_call_main (main=main@entry=0x55555558c700 <main>, argc=argc@entry=1, argv=argv@entry=0x7fffffffdf58) at ../sysdeps/nptl/libc_start_call_main.h:58
//#11 0x00007ffff7429e40 in __libc_start_main_impl (main=0x55555558c700 <main>, argc=1, argv=0x7fffffffdf58, init=<optimized out>, fini=<optimized out>, rtld_fini=<optimized out>, stack_end=0x7fffffffdf48) at ../csu/libc-start.c:392
//#12 0x000055555558e2b5 in _start ()
static void
virNetServerClientDispatchEvent(virNetSocket *sock, int events, void *opaque)
{
virNetServerClient *client = opaque;
virNetMessage *msg = NULL;
VIR_WITH_OBJECT_LOCK_GUARD(client) {
if (client->sock != sock) {
virNetSocketRemoveIOCallback(sock);
return;
}
if (events & (VIR_EVENT_HANDLE_WRITABLE | VIR_EVENT_HANDLE_READABLE)) {
if (client->tls &&
virNetTLSSessionGetHandshakeStatus(client->tls) !=
VIR_NET_TLS_HANDSHAKE_COMPLETE) {
virNetServerClientDispatchHandshake(client);
} else {
if (events & VIR_EVENT_HANDLE_WRITABLE)
virNetServerClientDispatchWrite(client);
if ((events & VIR_EVENT_HANDLE_READABLE) && client->rx)
msg = virNetServerClientDispatchRead(client);
}
}
/* NB, will get HANGUP + READABLE at same time upon disconnect */
if (events & (VIR_EVENT_HANDLE_ERROR | VIR_EVENT_HANDLE_HANGUP))
client->wantClose = true;
}
if (msg)
virNetServerClientDispatchMessage(client, msg);
}
/*
* Process all queued client->tx messages until
* we would block on I/O
*/
static void
virNetServerClientDispatchWrite(virNetServerClient *client)
{
while () {
if (client->tx->bufferOffset < client->tx->bufferLength) {
ssize_t ret;
ret = virNetServerClientWrite(client);
if (ret < 0) {
client->wantClose = true;
return;
}
if (ret == 0)
return; /* Would block on write EAGAIN */
}
if (client->tx->bufferOffset == client->tx->bufferLength) {
virNetMessage *msg;
size_t i;
for (i = client->tx->donefds; i < client->tx->nfds; i++) {
int rv;
if ((rv = virNetSocketSendFD(client->sock, client->tx->fds[i])) < 0) {
client->wantClose = true;
return;
}
if (rv == 0) /* Blocking */
return;
client->tx->donefds++;
}
/* Completed this 'tx' operation, so now read for all
* future rx/tx to be under a SASL SSF layer
*/
if (client->sasl) {
virNetSocketSetSASLSession(client->sock, client->sasl);
g_clear_pointer(&client->sasl, virObjectUnref);
}
/* Get finished msg from head of tx queue */
msg = virNetMessageQueueServe(&client->tx);
if (msg->tracked) {
client->nrequests--;
/* See if the recv queue is currently throttled */
if (!client->rx &&
client->nrequests < client->nrequests_max) {
/* Ready to recv more messages */
virNetMessageClear(msg);
msg->bufferLength = VIR_NET_MESSAGE_LEN_MAX;
msg->buffer = g_new0(char, msg->bufferLength);
client->rx = g_steal_pointer(&msg);
client->nrequests++;
}
}
virNetMessageFree(msg);
virNetServerClientUpdateEvent(client);
if (client->delayedClose)
client->wantClose = true;
}
}
}