抱歉,您的浏览器无法访问本站
本页面需要浏览器支持(启用)JavaScript
了解详情 >

SPDK 自带的性能测试应用 spdk_nvme_perf 的源代码 perf.c 的粗浅分析,进一步熟悉 SPDK 用户态驱动的主要工作流程和方式。

在看代码的过程中有些不懂的地方在网上查资料后学习了,记录在了 【学习笔记】SPDK(四):补充知识记录 中。

简介

perf 是 SPDK 用来测试 NVMe SSD 性能的工具,最新版本的 SPDK 中 perf 源代码在 spdk/app/spdk_nvme_perf/ 路径下。perf 主要用来测试 NVMe SSD 的 IOPS,Bandwidth 和 Latency,它既可以测本地的 target,也可以测远端的 target。

perf 主流程


perf 资源关系图


函数调用栈说明:

  • 相同缩进表示同一层的函数执行;不同缩进表示存在函数内调用另一个函数。

  • 花括号表示循环体。

  • 函数结尾 ; 表示该函数结束;: 表示进入该函数,接下来存在函数调用。

所有函数的调用可以利用 SPDK 的 debug,通过 -G 参数开启,不过没有函数的调用层次。


(一)控制流(初始化)

流程图:

函数调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
spdk_env_opts_init(&opts);
parse_args(...):
while {
getopt_long(..., g_perf_cmdline_opts, ...);
add_trid(...);
... // 创建了 trid 和 trid_entry
}
spdk_env_init(&opts);
register_workers():
SPDK_ENV_FOREACH_CORE(i) {
worker = calloc(...);
}
TAILQ_INIT(&worker->ns_ctx);
TAILQ_INSERT_TAIL(&g_workers, worker, link);
register_controllers():
TAILQ_FOREACH(trid_entry, &g_trid_list, tailq) {
spdk_nvme_probe(&trid_entry->trid, trid_entry, probe_cb, attach_cb, NULL):
probe_ctx = spdk_nvme_probe_async(...): // 注意异步执行
nvme_driver_init();
probe_ctx = calloc(...);
nvme_probe_ctx_init(probe_ctx, ...):
TAILQ_INIT(&probe_ctx->init_ctrlrs);
nvme_probe_internal(probe_ctx, false):
nvme_rdma.c: nvme_fabric_ctrlr_scan(probe_ctx, ...):
// 该函数调用见后文、构造 ctrlr 函数
nvme_init_controllers(probe_ctx):
while (true) { // 注意这里有一个无限 while 循环,意味着 nvme_ctrlr_process_init() 在一直循环执行
spdk_nvme_probe_poll_async(probe_ctx): // 注意异步执行
TAILQ_FOREACH_SAFE(ctrlr, &probe_ctx->init_ctrlrs, ...) {
nvme_ctrlr_poll_internal(ctrlr, probe_ctx):
/** 处理 ctrlr 状态转移 **/
nvme_ctrlr_process_init(ctrlr):
// 该函数调用见后文、处理状态转移函数
TAILQ_REMOVE(&probe_ctx->init_ctrlrs, ctrlr, ...);
TAILQ_INSERT_TAIL(&g_nvme_attached_ctrlrs, ctrlr, tailq);
perf.c: attach_cb(...):
register_ctrlr(ctrlr, trid_entry):
build_nvme_name(ctrlr_entry->name, ..., ctrlr);
TAILQ_INSERT_TAIL(&g_controllers, ctrlr_entry, ...);
foreach (nsid) {
ns = spdk_nvme_ctrlr_get_ns(ctrlr, nsid);
register_ns(ctrlr, ns);
... // 进行一系列 io 检查和验证,保证可以正常进行 io 操作
ns_entry->u.nvme.ctrlr = ctrlr;
ns_entry->u.nvme.ns = ns;
build_nvme_ns_name(ns_entry->name, ..., ctrlr, ...);
TAILQ_INSERT_TAIL(&g_namespaces, entry, link);
}
}
}
}

初始化大体步骤:

  • 解析输入参数,初始化 SPDK、DPDK 环境;
  • 创建、注册 SPDK 工作线程;
  • 通过解析参数得到的 transport id、进行异步设备探测、通过状态机获取设备(控制器、NS 等)信息、注册设备(创建 entry 通过尾队列维护)等。

注意异步以及控制器设备注册时的状态机。这一部分也会发送少量请求。

并且刚开始的 trid 并不包含任何真正信息,作用就是给程序指明参数提供了有哪些 trid 需要去探测。


(二)控制流(初始化 - fabric_ctrlr_scan)

流程图:

函数调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
nvme_rdma.c: nvme_fabric_ctrlr_scan(probe_ctx, ...):
spdk_nvme_ctrlr_get_default_ctrlr_opts(&discovery_opts, ...):
SET_FIELD(...);
/** discovery_ctrlr 识别过程、没有获得 ns、subnqn 等过程 **/
discovery_ctrlr = nvme_rdma_ctrlr_construct(&probe_ctx->trid, &discovery_opts, ...):
... // 构造 rdma_ctrlr 设置部分字段、建立事件通道等
nvme_ctrlr_construct(&rctrlr->ctrlr):
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_INIT, NVME_TIMEOUT_INFINITE);
... // 设置 discovery_ctrlr 部分字段
TAILQ_INIT(&ctrlr->active_io_qpairs);
TAILQ_INIT(&ctrlr->active_procs);
RB_INIT(&ctrlr->ns);
rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, ..., async = true):
rqpair->state = NVME_RDMA_QPAIR_STATE_INVALID; // 创建了 rqpair、设置状态为 INVALID
qpair = &rqpair->qpair;
nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests, async):
... // 设置 qpair 部分字段
qpair->ctrlr = ctrlr;
qpair->trtype = ctrlr->trid.trtype;
qpair->async = async;
STAILQ_INIT(&qpair->free_req);
for {
STAILQ_INSERT_HEAD(&qpair->free_req, req, stailq);
}
nvme_ctrlr_add_process(&rctrlr->ctrlr, 0):
... // 设置 discovery ctrlr_proc 字段
TAILQ_INSERT_TAIL(&ctrlr->active_procs, ctrlr_proc, tailq);
while (discovery_ctrlr->state != NVME_CTRLR_STATE_READY) {
nvme_ctrlr_process_init(discovery_ctrlr):
// 该函数调用见后文、处理状态转移函数
}
nvme_ctrlr_cmd_identify(discovery_ctrlr, ..., &discovery_ctrlr->cdata, ...);
nvme_wait_for_completion(discovery_ctrlr->adminq, ...);
/** io ctrlr 识别过程、完整流程 **/
nvme_fabric_ctrlr_discover(discovery_ctrlr, probe_ctx):
while {
nvme_fabric_get_discovery_log_page(discovery_ctrlr, buffer, ...):
spdk_nvme_ctrlr_cmd_get_log_page(...);
nvme_wait_for_completion(discovery_ctrlr->adminq, status);
for {
nvme_fabric_discover_probe(log_page_entry++, probe_ctx, discovery_ctrlr->trid.priority):
... // 通过 log_page_entry 设置 trid
nvme_ctrlr_probe(&trid, probe_ctx, NULL):
spdk_nvme_ctrlr_get_default_ctrlr_opts(&discovery_opts, ...):
SET_FIELD(...);
/** 这里构建了 io ctrlr **/
ctrlr = nvme_rdma_ctrlr_construct(trid, &opts, devhandle = NULL):
rctrlr = spdk_zmalloc(...);
... // 获取 rdma 设备、设备属性等,设置 rctrlr 部分字段
nvme_ctrlr_construct(&rctrlr->ctrlr):
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_INIT, NVME_TIMEOUT_INFINITE);
... // 设置 ctrlr 部分字段
TAILQ_INIT(&ctrlr->active_io_qpairs);
TAILQ_INIT(&ctrlr->active_procs);
RB_INIT(&ctrlr->ns);
... // rctrlr 初始化 cm_events 队列
// 以下两步在建立 rdma cm 通道,在后面建立 rdma qpairs 连接时用到
rctrlr->cm_channel = rdma_create_event_channel();
flag = fcntl(rctrlr->cm_channel->fd, F_GETFL);
// 创建 admin qpair,qid = 0 为 admin queue
rctrlr->ctrlr.adminq = nvme_rdma_ctrlr_create_qpair(&rctrlr->ctrlr, qid = 0, ..., async = true):
rqpair->state = NVME_RDMA_QPAIR_STATE_INVALID; // 创建了 rqpair、设置状态为 INVALID
qpair = &rqpair->qpair;
nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests, async):
... // 设置 qpair 部分字段
qpair->ctrlr = ctrlr;
qpair->trtype = ctrlr->trid.trtype;
qpair->async = async;
STAILQ_INIT(&qpair->free_req);
for {
STAILQ_INSERT_HEAD(&qpair->free_req, req, stailq);
}
nvme_ctrlr_add_process(&rctrlr->ctrlr, 0):
... // 设置 ctrlr_proc 字段
TAILQ_INSERT_TAIL(&ctrlr->active_procs, ctrlr_proc, tailq);
}
nvme_qpair_set_state(ctrlr->adminq, NVME_QPAIR_ENABLED);
TAILQ_INSERT_TAIL(&probe_ctx->init_ctrlrs, ctrlr, tailq);
}

这部分是扫描创建 ctrlr 核心函数。大体步骤:

  • 创建 discovery_ctrlr 和其 Admin QP,通过不完全状态机,作用是获取 discovery_log_page
  • discovery_log_page 其中包含了 ctrlr 真正有用的信息如 tridsubnqn 等,用来构建 io_ctrlr
  • io_ctrlr 经过完全状态机的状态转移,直到 READY 状态,加入到 init_ctrlrs 队列中。

注意 ctrlr 的状态机,不同的状态执行不同的函数,如识别 NS、获取 Data 等,会通过 Admin QP 发送 identify 请求。


(三)控制流(初始化 - ctrlr_process_init)

Controller 状态转移过程。

流程图:

函数调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
 /** This function will be called repeatedly during initialization 
until the controller is ready. */
nvme_ctrlr_process_init(ctrlr):
// 状态 CONNECT_ADMINQ:
nvme_transport_ctrlr_connect_qpair(ctrlr, ctrlr->adminq):
nvme_qpair_set_state(qpair, NVME_QPAIR_CONNECTING);
... // 与建立 io qpairs 流程类似,见后文
// 状态 WAIT_FOR_CONNECT_ADMINQ:
spdk_nvme_qpair_process_completions(ctrlr->adminq, 0):
nvme_transport_qpair_process_completions(qpair, max_completions):
nvme_rdma_qpair_process_completions(qpair, max_completions):
TODO
switch (ctrlr->adminq->state) {
// QPAIR_CONNECTING:
break;
// QPAIR_CONNECTED:
nvme_qpair_set_state(ctrlr->adminq, NVME_QPAIR_ENABLED);
// QPAIR_ENABLED:
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_READ_VS, NVME_TIMEOUT_INFINITE);
nvme_qpair_abort_queued_reqs(ctrlr->adminq);
}
...
// 状态 RESET_ADMIN_QUEUE:
nvme_transport_qpair_reset(ctrlr->adminq):
// do nothing
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_IDENTIFY, ...);
// 状态 IDENTIFY:
nvme_ctrlr_identify(ctrlr):
nvme_ctrlr_cmd_identify(ctrlr, ..., &ctrlr->cdata, nvme_ctrlr_identify_done, ctrlr);
nvme_ctrlr_identify_done(ctrlr);
...
// 状态 SET_KEEP_ALIVE_TIMEOUT:
nvme_ctrlr_set_keep_alive_timeout(ctrlr):
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_WAIT_FOR_KEEP_ALIVE_TIMEOUT, ...);
spdk_nvme_ctrlr_cmd_get_feature(ctrlr, ..., nvme_ctrlr_set_keep_alive_timeout_done, ctrlr);
nvme_ctrlr_set_keep_alive_timeout_done(ctrlr):
spdk_nvme_ctrlr_is_discovery(ctrlr); // 判断是否为 dicovery_ctrlr
if (discovery_ctrlr): nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_READY, NVME_TIMEOUT_INFINITE);
else if (io_ctrlr): nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_IDENTIFY_IOCS_SPECIFIC, ...);
// 状态 IDENTIFY_IOCS_SPECIFIC:
nvme_ctrlr_identify_iocs_specific(ctrlr):
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_WAIT_FOR_IDENTIFY_IOCS_SPECIFIC, ...);
nvme_ctrlr_cmd_identify(ctrlr, ..., ctrlr->cdata_zns, nvme_ctrlr_identify_zns_specific_done, ctrlr);
nvme_ctrlr_identify_zns_specific_done(ctrlr):
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_SET_NUM_QUEUES, ...);
// 状态 SET_NUM_QUEUES:
nvme_ctrlr_update_nvmf_ioccsz(ctrlr);
nvme_ctrlr_set_num_queues(ctrlr):
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_WAIT_FOR_SET_NUM_QUEUES, ...);
nvme_ctrlr_cmd_set_num_queues(ctrlr, ctrlr->opts.num_io_queues, nvme_ctrlr_set_num_queues_done, ctrlr);
nvme_ctrlr_set_num_queues_done(ctrlr):
for (i = 1; i <= ctrlr->opts.num_io_queues) {
// 初始化 free io queue IDs 列表(bit array)
// QID 0 是 admin queue,已经 implicitly allocated
spdk_nvme_ctrlr_free_qid(ctrlr, i);
}
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_IDENTIFY_ACTIVE_NS, ...);
// 状态 IDENTIFY_ACTIVE_NS:
_nvme_ctrlr_identify_active_ns(ctrlr):
active_ns_ctx = nvme_active_ns_ctx_create(ctrlr, _nvme_active_ns_ctx_deleter); // 绑定 deleter
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_WAIT_FOR_IDENTIFY_ACTIVE_NS, ...);
nvme_ctrlr_identify_active_ns_async(active_ns_ctx): // 异步处理
nvme_ctrlr_cmd_identify(ctrlr, ..., &active_ns_ctx->new_ns_list[1024 * (active_ns_ctx->page_count - 1)], nvme_ctrlr_identify_active_ns_async_done, active_ns_ctx);
nvme_ctrlr_identify_active_ns_async_done(active_ns_ctx):
_nvme_active_ns_ctx_deleter(): // 函数指针
nvme_ctrlr_identify_active_ns_swap(ctrlr, ctx->new_ns_list, ...):
foreach (active_nsid) {
nvme_ctrlr_construct_namespace(ctrlr, nsid):
ns = spdk_nvme_ctrlr_get_ns(ctrlr, nsid):
RB_INSERT(nvme_ns_tree, &ctrlr->ns, ns);
}
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_IDENTIFY_NS, ...);
// 状态 IDENTIFY_NS:
nvme_ctrlr_identify_namespaces(ctrlr):
... // 获取 nsid、active_ns、active_ns 绑定 ctrlr
nvme_ctrlr_identify_ns_async(ns):
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_WAIT_FOR_IDENTIFY_NS, ...);
nvme_ctrlr_cmd_identify(ns->ctrlr, ..., &ns->nsdata, nvme_ctrlr_identify_ns_async_done, ns);
nvme_ctrlr_identify_ns_async_done(ns):
nvme_ns_set_identify_data(ns);
... // 获取下一个 nsid、active_ns、active_ns 绑定 ctrlr
if (ns == NULL): nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_IDENTIFY_ID_DESCS, ...);
else: nvme_ctrlr_identify_ns_async(ns):
... // 循环执行直到 ns == NULL
// 状态 IDENTIFY_ID_DESCS:
nvme_ctrlr_identify_id_desc_namespaces(ctrlr);
...
// 状态 IDENTIFY_NS_IOCS_SPECIFIC:
nvme_ctrlr_identify_namespaces_iocs_specific(ctrlr);
...
// 状态 TRANSPORT_READY:
nvme_transport_ctrlr_ready(ctrlr); // 没有找到实现函数
nvme_ctrlr_set_state(ctrlr, NVME_CTRLR_STATE_READY, NVME_TIMEOUT_INFINITE);
// 状态 NVME_CTRLR_STATE_READY.
// 状态 WAIT_FOR_XXX 统一处理:(通过 callback 执行转换下一个状态操作)
spdk_nvme_qpair_process_completions(ctrlr->adminq, 0):
nvme_transport_qpair_process_completions(qpair, max_completions):
nvme_rdma_qpair_process_completions(qpair, max_completions):
TODO

不同状态执行不同的函数,函数执行完毕后会手动改变 ctrlr 的状态,从而使 ctrlr 进入下一个状态。


(四)控制流(任务执行)

流程图:

函数调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/** 创建轮询线程执行任务 **/
pthread_create(&thread_id, NULL, &nvme_poll_ctrlrs, NULL);
// 进入轮询线程:
nvme_poll_ctrlrs():
while (true) {
foreach (entry, &g_controllers, link) {
spdk_nvme_ctrlr_process_admin_completions(entry->ctrlr):
nvme_ctrlr_keep_alive(ctrlr); // 发送 keep alive 指令
nvme_io_msg_process(ctrlr): // 处理 ctrlr io 消息
if (ctrlr->needs_io_msg_update): nvme_io_msg_ctrlr_update(ctrlr):
STAILQ_FOREACH(io_msg_producer, &ctrlr->io_producers, link) {
io_msg_producer->update(ctrlr);
}
spdk_nvme_qpair_process_completions(ctrlr->external_io_msgs_qpair, 0):
nvme_transport_qpair_process_completions(qpair, max_completions):
nvme_rdma_qpair_process_completions(qpair, max_completions):
TODO
count = spdk_ring_dequeue(ctrlr->external_io_msgs, requests, ...); // 从外部 I/O 消息环中按需排队获取消息,存储在 requests 数组中
for (i < count) {
spdk_nvme_io_msg *io = requests[i];
io->fn(io->ctrlr, io->nsid, io->arg); // 调用 io 消息处理函数
}
spdk_nvme_qpair_process_completions(ctrlr->adminq, 0):
nvme_transport_qpair_process_completions(qpair, max_completions):
nvme_rdma_qpair_process_completions(qpair, max_completions):
TODO
if (active_proc): nvme_ctrlr_complete_queued_async_events(ctrlr); // 如果当前进程在 ctrlr->active_procs 中,则处理 async_events
}
}

// 回到主线程 main():
associate_workers_with_ns():
for (max(g_num_namespaces, g_num_workers)) {
allocate_ns_worker(ns_entry, worker):
ns_ctx->entry = entry;
TAILQ_INSERT_TAIL(&worker->ns_ctx, ns_ctx, link); // worker_ns_ctx 加入 worker->ns_ctx 队列中
}
pthread_barrier_init(&g_worker_sync_barrier, NULL, g_num_workers); // barrier 线程同步,当所有 thread 都执行到 barrier_wait() 后,才继续执行后续代码
TAILQ_FOREACH(worker, &g_workers, link) {
if (worker->lcore != g_main_core): spdk_env_thread_launch_pinned(worker->lcore, work_fn, worker); // worker thread 一一对应到 core、每个 core 绑定一个 work_fn
}
work_fn(main_worker);
// 每个 core 执行 work_fn
work_fn(worker):
TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
nvme_init_ns_worker_ctx(ns_ctx): // 为每个 ns 分配 io_qpairs
spdk_nvme_ctrlr_get_default_io_qpair_opts(ns_entry->u.nvme.ctrlr, &opts, sizeof(opts));
ctrlr_opts = spdk_nvme_ctrlr_get_opts(ns_entry->u.nvme.ctrlr);
ns_ctx->u.nvme.group = spdk_nvme_poll_group_create(ns_ctx, NULL): // 创建 nvme_poll_group
group->ctx = ns_ctx;
STAILQ_INIT(&group->tgroups); // tgroup: transport_poll_group
for (i < ns_ctx->u.nvme.num_all_qpairs) {
// u.nvme.qpair 是二维指针
// 1. 创建 io qpair
ns_ctx->u.nvme.qpair[i] = spdk_nvme_ctrlr_alloc_io_qpair(ns_entry->u.nvme.ctrlr, &opts, ...):
qpair = nvme_ctrlr_create_io_qpair(ctrlr, &opts):
qid = spdk_nvme_ctrlr_alloc_qid(ctrlr);
qpair = nvme_rdma_ctrlr_create_io_qpair(ctrlr, qid, opts): // 函数指针
nvme_rdma_ctrlr_create_qpair(ctrlr, qid, ..., opts->async_mode = true):
// 与前文 fabric_ctrlr_scan 创建 adminq 类似
rqpair->state = NVME_RDMA_QPAIR_STATE_INVALID; // 创建了 rqpair、设置状态为 INVALID
qpair = &rqpair->qpair;
nvme_qpair_init(qpair, qid, ctrlr, qprio, num_requests, async):
... // 设置 qpair 部分字段
qpair->ctrlr = ctrlr;
qpair->trtype = ctrlr->trid.trtype;
qpair->async = async;
STAILQ_INIT(&qpair->free_req);
for {
STAILQ_INSERT_HEAD(&qpair->free_req, req, stailq);
}
TAILQ_INSERT_TAIL(&ctrlr->active_io_qpairs, qpair, tailq);
nvme_ctrlr_proc_add_io_qpair(qpair): // 与当前 active_proc 关联
TAILQ_INSERT_TAIL(&active_proc->allocated_io_qpairs, qpair, per_process_tailq);
qpair->active_proc = active_proc;
// create_only = true
// 2. qpair 添加 poll groups
spdk_nvme_poll_group_add(nvme_poll_group, qpair): // io_qpair 添加到 nvme_poll_group、transport_poll_group
while (transport) {
tgroup = nvme_transport_poll_group_create(transport): // 创建 transport_poll_group
nvme_rdma_poll_group_create(): // 创建 rdma_poll_group
STAILQ_INIT(&rdma_poll_group->pollers);
TAILQ_INIT(&rdma_poll_group->connecting_qpairs);
TAILQ_INIT(&rdma_poll_group->active_qpairs);
return rdma_poll_group->transport_poll_group;
STAILQ_INIT(&group->connected_qpairs);
STAILQ_INIT(&group->disconnected_qpairs);
tgroup->group = nvme_poll_group; // transport_poll_group 关联 nvme_poll_group
STAILQ_INSERT_TAIL(&nvme_poll_group->tgroups, tgroup, link);
}
nvme_transport_poll_group_add(tgroup, qpair):
qpair->poll_group = tgroup; // qpair 关联 transport_poll_group
nvme_rdma_poll_group_add(tgroup, qpair):
// do nothing
// 这里 rdma_poll_group 没有与 qpair 关联,在后面 rdma_poll_group 与 rdma_qpair 关联
// 3. 连接 io qpair
spdk_nvme_ctrlr_connect_io_qpair(entry->u.nvme.ctrlr, qpair):
nvme_transport_ctrlr_connect_qpair(ctrlr, qpair):
nvme_qpair_set_state(qpair, NVME_QPAIR_CONNECTING); // 这里设置了 qpair 状态为正在连接
nvme_rdma_ctrlr_connect_qpair(ctrlr, qpair): // 函数指针
// 通过 nvme_qpair、nvme_ctrlr 移动 n 个字节得到
rqpair = nvme_rdma_qpair(qpair): // rqpair 为 nvme_rdma_qpair
SPDK_CONTAINEROF(qpair, struct nvme_rdma_qpair, qpair):
#define SPDK_CONTAINEROF(ptr, type, member) ((type *)((uintptr_t)ptr - offsetof(type, member)));
/* offsetof(...): Offset of member MEMBER in a struct of type TYPE. */
rctrlr = nvme_rdma_ctrlr(ctrlr); // rctrlr 为 nvme_rdma_ctrlr
nvme_parse_addr(&dst_addr, ctrlr->trid.traddr, &port, ...); // 解析 destination_address
nvme_parse_addr(&src_addr, ctrlr->opts.src_addr, &src_port, ...); // 解析 source_address
rdma_create_id(rctrlr->cm_channel, &rqpair->cm_id, rqpair, RDMA_PS_TCP); // 分配 communication id
nvme_rdma_resolve_addr(rqpair, &src_addr, &dst_addr):
rdma_set_option(rqpair->cm_id, ...);
rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr, ...);
// 开始处理 rdma events
nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ADDR_RESOLVED = 0, nvme_rdma_addr_resolved):
rqpair->evt_cb = evt_cb; // evt_cb = nvme_rdma_addr_resolved
... // 经过一系列操作,最后是怎么执行回调函数 nvme_rdma_addr_resolved()?
/** 这里的 rdma_process_event_start() 调用逻辑见后文 **/
rqpair->state = NVME_RDMA_QPAIR_STATE_INITIALIZING; // 注意这里设置了 rqpair 状态,可能用于状态机
rgroup = nvme_rdma_poll_group(qpair->poll_group); // rgroup 为 nvme_rdma_poll_group
TAILQ_INSERT_TAIL(&rgroup->connecting_qpairs, rqpair, link_connecting); // rdma_poll_group 与 rdma_qpair 关联
nvme_poll_group_connect_qpair(qpair):
nvme_transport_poll_group_connect_qpair(qpair):
nvme_rdma_poll_group_connect_qpair(qpair):
// do nothing
// 这里 rdma_poll_group 没有连接 rqair
// rqpair 在什么时候从 rgroup->connecting_qpairs 中移除并加入到 rgroup->active_qpairs?
// active_qpairs 是在 submit_request 中才使用,因此作用理解为当 rdma_qpair 有 req 时,加入到 active_qpairs 中
STAILQ_INSERT_TAIL(&tgroup->connected_qpairs, qpair, poll_group_stailq);
// qpair->async = true
// 4. poll group 处理 qpairs 完成事件
while {
/** poll for completions on all qpairs in this poll group **/
// 与 检查 io 数据流 类似,见后文
spdk_nvme_poll_group_process_completions(group, 0, perf_disconnect_cb):
STAILQ_FOREACH(tgroup, &group->tgroups, link) {
nvme_transport_poll_group_process_completions(tgroup, ..., disconnected_qpair_cb):
nvme_rdma_poll_group_process_completions(tgroup, ..., disconnected_qpair_cb): // 函数指针
TAILQ_FOREACH_SAFE(rqpair, &rgroup->connecting_qpairs, ...) {
nvme_rdma_ctrlr_connect_qpair_poll(rqpair->qpair->ctrlr, rqpair->qpair):
// TODO 这一块没有看懂
// rdma_qpair 状态机
// 根据不同状态执行不同操作、通过 callback 转移状态、直到状态为 NVME_RDMA_QPAIR_STATE_RUNNING 结束
// 在这里可能执行了 rqpair->evt_cb = nvme_rdma_addr_resolved() 函数
// 然后 rdma 解析完 addr 后进行路由
// 按以上思路走
// rqpair->state = NVME_RDMA_QPAIR_STATE_INITIALIZING
...
TAILQ_REMOVE(&rgroup->connecting_qpairs, rqpair, ...);
// submit queued requests
nvme_qpair_resubmit_requests(rqpair->qpair, rqpair->num_entries):
... // 与 发送数据流 类似,见后文
}
STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, ...) {
nvme_rdma_qpair_process_cm_event(rqpair);
... // rdma_qpair 状态机
}
STAILQ_FOREACH(poller, &rgroup->pollers, ...) {
nvme_rdma_cq_process_completions(poller->cq, batch_size, poller, NULL, &rdma_completions); // 处理 rdma cq
... // 与 检查 io 数据流 类似,见后文
}
}
spdk_nvme_poll_group_all_connected(group):
... // 检查是否都 connected,否则 while 循环继续
}
}
}
pthread_barrier_wait(&g_worker_sync_barrier); // 线程同步
TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
submit_io(ns_ctx, g_queue_depth); // 发送数据流,见后文
}
while (!g_exit) {
TAILQ_FOREACH(ns_ctx, &worker->ns_ctx, link) {
ns_ctx->entry->fn_table->check_io(ns_ctx); // 检查 io 数据流,见后文
... // 计时、信息统计
}
... // 遍历完一次队列之后就检查时间,如果超过指定的时间就退出,否则继续
}
... // 计时、信息统计、ns_ctx 清理
// 回到主线程 main():
spdk_env_thread_wait_all();
print_stats(); // 打印信息统计
pthread_barrier_destroy(&g_worker_sync_barrier);
... // cleanup
unregister_trids();
unregister_namespaces();
unregister_controllers();
unregister_workers();
spdk_env_fini();

工作任务大体步骤:

  • 一般每个线程管理一个 NS;
  • 为每个 NS 创建 n 个(假设一个)io QP
  • io QP 需要与 Target 端 io QP 建立对应连接,因此需要建立 RDMA 连接;
  • io QP 创建轮询组 poll_group 以及轮询器 poller
  • 准备发送 IO 和检查 IO 完成情况。

(五)控制流(建立 RDMA 层连接)

函数调用栈:

1
2
3
4
5
6
7
/** 开始解析地址 **/
nvme_rdma_resolve_addr(rqpair, &src_addr, &dst_addr):
rdma_set_option(rqpair->cm_id, ...);
rdma_resolve_addr(rqpair->cm_id, src_addr, dst_addr, ...);
// 开始处理 rdma events
nvme_rdma_process_event_start(rqpair, RDMA_CM_EVENT_ADDR_RESOLVED = 0, nvme_rdma_addr_resolved):
// 略

RDMA 建立连接部分略。

在这一步中,在 RDMA 建立连接完成后,io QP 创建了 rdma_req 池和 rdma_rsp 池,并且 rsps 全部压到 RQ 中。


(六)数据流(发送)

流程图:

函数调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/* Submit initial I/O for each namespace. */
submit_io(ns_ctx, g_queue_depth):
while (queue_depth-- > 0) {
task = allocate_task(ns_ctx, queue_depth):
nvme_setup_payload(task, pattern = queue_depth % 8 + 1):
... // 设置 io 负载
task->ns_ctx = ns_ctx;
submit_single_io(task):
nvme_submit_io(task, task->ns_ctx, task->ns_ctx->entry, offset_in_ios):
qp_num = ns_ctx->u.nvme.last_qpair++;
spdk_nvme_ns_cmd_read_with_md(entry->u.nvme.ns, ns_ctx->u.nvme.qpair[qp_num], ..., io_complete, ...): // io_complete() callback
payload = NVME_PAYLOAD_CONTIG(buffer, metadata); // 配置 payload
req = _nvme_ns_cmd_rw(ns, qpair, &payload, SPDK_NVME_OPC_READ, ...):
req = nvme_allocate_request(qpair, payload, ...): // 从请求池中分配 req
req = STAILQ_FIRST(&qpair->free_req);
STAILQ_REMOVE_HEAD(&qpair->free_req, stailq);
NVME_INIT_REQUEST(req, cb_fn, cb_arg, *payload, ...);
... // 配置 req
_nvme_ns_cmd_setup_request(ns, req, ...); // 配置 req->cmd
nvme_qpair_submit_request(qpair, req): // 请求提交到 qpair
_nvme_qpair_submit_request(qpair, req):
nvme_transport_qpair_submit_request(qpair, req):
// admin queue 不支持将 transport 存储在 ctrlr 或 admin queue 中
// io queue 支持
if (!nvme_qpair_is_admin_queue(qpair)): qpair->transport->ops.qpair_submit_request(qpair, req);
else: transport->ops.qpair_submit_request(qpair, req): // transport 通过 nvme_get_transport(qpair->ctrlr->trid.trstring) 获取
nvme_rdma_qpair_submit_request(qpair, req):
rqpair = nvme_rdma_qpair(qpair);
rdma_req = nvme_rdma_req_get(rqpair);
nvme_rdma_req_init(rqpair, req, rdma_req):
rdma_req->req = req;
req->cmd.cid = rdma_req->id;
... // 配置 rdma_req
TAILQ_INSERT_TAIL(&rqpair->outstanding_reqs, rdma_req, link); // 添加到 rqpair->outstanding_reqs 队列中
TAILQ_INSERT_TAIL(&rgroup->active_qpairs, rqpair, link_active); // rqpair 添加到 rgroup->active_qpairs 队列中
// 发送请求排队
spdk_rdma_qp_queue_send_wrs(rqpair->rdma_qp, wr);
// 提交发送请求
nvme_rdma_qpair_submit_sends(rqpair);
// 刷新 rqpair
spdk_rdma_qp_flush_send_wrs(rqpair->rdma_qp, &bad_send_wr):
// 提交发送请求至 send queue
ibv_post_send(spdk_rdma_qp->qp, spdk_rdma_qp->send_wrs.first, bad_wr);
/** 发送请求的过程结束 **/

}

发送 IO 大体步骤:

  • 以 IO 任务的形式下发。这个 IO 任务在后续被接收完成后会被复用,直到超过运行时间后回收内存并释放;
  • IO 任务可以看作是 io queue 的槽,一共会有 g_queue_depth 个 IO 任务;
  • 在发送前,设置 IO 任务的 io 偏移和随机读写,然后提交任务生成 nvme_req 请求;
  • nvme SQ 的队列长度一般小于 io queueg_queue_depth,所以会出现 IO 任务排队;
  • 通过层层封装、转换,经过 perf_task -> nvme_req -> rdma_req 后,通过 RDMA ibverbs 发送请求。

(七)数据流(检查 IO 完成情况)

流程图:

函数调用栈:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
check_io(ns_ctx):
nvme_check_io(ns_ctx):
/** poll for completions on all qpairs in this poll group **/
spdk_nvme_poll_group_process_completions(ns_ctx->u.nvme.group, g_max_completions, perf_disconnect_cb):
STAILQ_FOREACH(tgroup, &group->tgroups, link) {
nvme_transport_poll_group_process_completions(tgroup, completions_per_qpair, ...):
nvme_rdma_poll_group_process_completions(tgroup, completions_per_qpair, ...):
// 1. 处理 cm_events
STAILQ_FOREACH_SAFE(qpair, &tgroup->connected_qpairs, ...) {
rqpair = nvme_rdma_qpair(qpair);
/** event 为空,没有执行
nvme_rdma_qpair_process_cm_event(rqpair);
*/
}
// 2. poll CQ
STAILQ_FOREACH(poller, &rgroup->pollers, link) {
while {
nvme_rdma_cq_process_completions(poller->cq, batch_size, poller, rdma_qpair = NULL, &rdma_completions):
rc = ibv_poll_cq(cq, batch_size, ibv_wc); // 从 CQ 中取 work completion
for (i < rc) {
rdma_wr = (struct nvme_rdma_wr *)ibv_wc[i].wr_id;
/** 处理 recv */
if (rdma_wr->type == RDMA_WR_TYPE_RECV): nvme_rdma_process_recv_completion(poller, &ibv_wc[i], rdma_wr): // rdma work request 类型为接收
rdma_rsp = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_rsp, rdma_wr);
rqpair = rdma_rsp->rqpair;
rdma_req = &rqpair->rdma_reqs[rdma_rsp->cpl.cid];
rdma_req->completion_flags |= NVME_RDMA_RECV_COMPLETED;
rdma_req->rdma_rsp = rdma_rsp;
// 这里是如何不进入 if 代码段?
if ((rdma_req->completion_flags & NVME_RDMA_SEND_COMPLETED) == 0) {
return 0;
}
nvme_rdma_request_ready(rqpair, rdma_req):
rdma_rsp = rdma_req->rdma_rsp;
recv_wr = rdma_rsp->recv_wr;
recv_wr->next = NULL;
nvme_rdma_req_complete(rdma_req, &rdma_rsp->cpl, true):
req = rdma_req->req;
qpair = rdma_req->req->qpair;
rqpair = nvme_rdma_qpair(qpair);
TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
nvme_complete_request(req->cb_fn, req->cb_arg, qpair, req, rsp):
_nvme_free_request(req, qpair); // 回收 req
io_complete(cb_arg = perf_task, cpl):
task_complete(task):
... // 计时、信息统计
nvme_rdma_req_put(rqpair, rdma_req); // 回收 rreq
// 接收请求排队
// 疑问:为啥要提交接收请求?
// 可能是因为 RQ 队列需要随时准备 recv_wr 来随时准备接收?
spdk_rdma_qp_queue_recv_wrs(rqpair->rdma_qp, recv_wr):
rdma_queue_recv_wrs(&spdk_rdma_qp->recv_wrs, first, &spdk_rdma_qp->stats->recv);
// 提交接收请求
nvme_rdma_qpair_submit_recvs(rqpair):
spdk_rdma_qp_flush_recv_wrs(rqpair->rdma_qp, &bad_recv_wr);
ibv_post_recv(spdk_rdma_qp->qp, spdk_rdma_qp->recv_wrs.first, bad_wr);
/** 处理 send */
else (rdma_wr->type == RDMA_WR_TYPE_SEND): nvme_rdma_process_send_completion(poller, rdma_qpair, &ibv_wc[i], rdma_wr): // rdma work request 类型为发送
rdma_req = SPDK_CONTAINEROF(rdma_wr, struct spdk_nvme_rdma_req, rdma_wr);
rqpair = nvme_rdma_qpair(rdma_req->req->qpair);
rdma_req->completion_flags |= NVME_RDMA_SEND_COMPLETED;
// ?
if ((rdma_req->completion_flags & NVME_RDMA_RECV_COMPLETED) == 0) {
return 0;
}
/** 以下代码段没有执行
nvme_rdma_request_ready(rqpair, rdma_req):
rdma_rsp = rdma_req->rdma_rsp;
recv_wr = rdma_rsp->recv_wr;
nvme_rdma_req_complete(rdma_req, &rdma_rsp->cpl, true):
qpair = rdma_req->req->qpair;
rqpair = nvme_rdma_qpair(qpair);
TAILQ_REMOVE(&rqpair->outstanding_reqs, rdma_req, link);
nvme_complete_request(req->cb_fn, req->cb_arg, qpair, req, rsp):
_nvme_free_request(req, qpair); // 回收 req
io_complete(cb_arg = perf_task, cpl):
task_complete(task):
... // 计时、信息统计
nvme_rdma_req_put(rqpair, rdma_req); // 回收 rreq
// 接收请求排队
// 疑问:为啥要接收请求?
spdk_rdma_qp_queue_recv_wrs(rqpair->rdma_qp, recv_wr):
rdma_queue_recv_wrs(&spdk_rdma_qp->recv_wrs, first, &spdk_rdma_qp->stats->recv);
// 提交接收请求
nvme_rdma_qpair_submit_recvs(rqpair):
spdk_rdma_qp_flush_recv_wrs(rqpair->rdma_qp, &bad_recv_wr);
ibv_post_recv(spdk_rdma_qp->qp, spdk_rdma_qp->recv_wrs.first, bad_wr);
**/
}

}
}
// 3. 活跃 QP 提交 send/recv 请求
TAILQ_FOREACH_SAFE(rqpair, &group->active_qpairs, link_active, tmp_rqpair) {
nvme_rdma_qpair_process_submits(group, rqpair):
// 提交 send_wrs 和 recv_wrs
nvme_rdma_qpair_submit_sends(rqpair);
nvme_rdma_qpair_submit_recvs(rqpair);
// 重新提交 queued 请求
if (rqpair->num_completions > 0): nvme_qpair_resubmit_requests(qpair, rqpair->num_completions);
}
}

检查 IO 完成情况大体步骤:

  • CQ 收到 CQE 后,通过结构体偏移得到 WQE,判断是 SEND 还是 RECV 请求的完成消息;
  • 只有当 WQE 指向的 rdma_reqcompletion_flagsSEND_COMPLETED & RECV_COMPLETED 即发送和接收都完成了,才判断为 IO 任务完成;
  • 通过 callback 得到原始 IO 任务;
  • 记录时间,统计信息;
  • IO 任务被复用,重新设置 io 偏移量以及随机读写,重复发送过程。