# VPP Node/Feature ![node_graph.png](../../_static/node_graph.png) ## VPP-Node ### Node 相关结构体 * `vlib_node_t` 结点的主结构,包括结点的处理功能函数,名称,结点类型等,主要保存一些相对静态信息。 * `vlib_node_main_t` Node graph相关的全局信息,Nodes 数组、vlib_next_frame_t、vlib_pending_frame_t等数据。 * `vlib_node_registration_t` 注册 Node 结点时使用,保存结点业务逻辑的函数地址,结点类型,结点状态,结点名称等。 * `vlib_node_runtime_t` 实际在调度 Node 过程中使用的结构,主要记录在处理过程中的信息变动。 * `vlib_frame_t` 每个 Node 都有一个对应的vlib_frame_t,用来保存供 Node 使用的数据集合(标量、矢量)。 * `vlib_next_frame_t` 主要是 Node 内部逻辑使用,用于定位该 Node 的下一结点所对应的 Frame 地址。 * `vlib_pending_frame_t` 当一个 Node 处理完数据包,则填充该待处理帧管理表数据结,调度框架便能在下一次调度时找到需要接收该数据包的下一个Node。 ### Node 类型 ``` VLIB_NODE_TYPE_INTERN : 表示处理报文的业务 Node,可在 main/worker 线程运行。 VLIB_NODE_TYPE_INPUT : 表示为 INTERNAL 类型 Node 提供报文;收报逻辑 Node,比如 dpdk-input,可在 main/worker 线程运行。 VLIB_NODE_TYPE_PRE_INPUT : 主要是为 PROCESS 类型的 Node 提供数据,目前只有一个 epoll node,对 socket 相关逻辑提供服务,主要使用在控制业务上面,可在 main 线程运行。 VLIB_NODE_TYPE_PROCESS : 线程节点,和线程一样,可以暂停、等待、恢复时间,不同于 pthread_create,他是基于setjump/longjump实现的协程,可在 main 线程运行。 ``` **setJump 和 longJump 简单说明 ,详情参考**https://www.jianshu.com/p/67592ec248f1 ``` #include int setjump(jmp_buf env); int sigsetjump(sigjmp_buf env,int savesigs); ``` 非局部跳转语句— setjmp 和 longjmp 函数。非局部指的是,这不是由普通 C 语言 goto 语句在一个函数内实施的跳转,而是在栈上跳过若干调用帧,返回到当前函数调用路径上的某一个函数中。 ### 调度方式 共分为三种状态: ``` VLIB_NODE_STATE_POLLING : 轮询模式 VLIB_NODE_STATE_INTERRUPT : 中断模式 VLIB_NODE_STATE_DISABLED : 如果被设置了该状态,则不会被调用 ``` **从代码上来看默认全是轮询方式。** ``` /* Next process input nodes. */ vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]) cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT, VLIB_NODE_STATE_POLLING, /* frame */ 0, cpu_time_now); /////////////////////////////////////////////////////// last_time_stamp = dispatch_node (vm, n, VLIB_NODE_TYPE_INTERNAL, VLIB_NODE_STATE_POLLING, f, last_time_stamp); ``` **中断和轮询之间的切换:** - 模式切换依据矢量速率(累计报文数量),在vlib_main_or_worker_loop启动中设置。 - 当处于中断模式且矢量速率超过阈值时,切换至轮询模式。 - 当处于轮询模式且矢量速率低于阈值时,切换至中断模式。 这里通俗理解一下,意思就是流量大,报文密集的时候,使用轮询模式;流量小的时候,就切换到中断模式,减少cpu压力。 这个切换的原理想简单一点,还是比较容易理解的,如果当前模式为轮询状态的时候,流量突然变小了,这样就会出现cpu空转,可能转了几次才收到2、3个报文,甚至1个;这样就太浪费资源了,就切换到中断模式。 反过来想,如果当前模式是中断模式的时候,流量突然变大,cpu发生中断的那一刻,累计了报文多了,一次vector packet太多,超过阈值就要切换过来,这样就提高性能了,否则就处理不过来了。 这个说法虽然很粗糙,但是理解起来比较容易。 #### Node Graph相关结构体 - `vlib_node_main_t` 节点图主结构,记录全局节点图的数据信息。 - `vlib_node_t` 单个节点的主结构,包括节点的处理功能函数,名称,节点类型等,主要保存一些相对静态信息。 - `vlib_node_registration_t` 注册 Node 节点时使用,保存节点业务逻辑的函数地址,节点类型,节点状态,节点名称等。 - `vlib_node_runtime_t` 实际在调度 Node 过程中使用的结构,主要记录在处理过程中的信息变动。 - `vlib_frame_t` 保存每个 Node 对应的要处理的数据的内存地址信息。 - `vlib_pending_frame_t` 记录运行节点的索引,数据包索引,以及下一个数据包的索引。 - `vlib_next_frame_t` 主要用于获取 Node 要处理的下一条数据。 #### VLIB_NODE_TYPE_INPUT节点类型 INPUT类型节点一般运行在 worker 线程,也可以配置在 main 线程里面运行。 调用流程在 `vlib_main_or_worker_loop` 函数的 while(1) 循环里面;简单去想,就是收报文用的。 ``` /* Next process input nodes. */ vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT]) cpu_time_now = dispatch_node (vm, n, VLIB_NODE_TYPE_INPUT, VLIB_NODE_STATE_POLLING, /* frame */ 0, cpu_time_now); ``` #### VLIB_NODE_TYPE_INTERNAL 节点类型 简单的说就是处理报文的。 这个我们可以通过分析 INPUT 类型的 Node 是如何把报文发送给 INTERNAL 类型的 Node 的整个过程来理解,具体步骤分为以下几步: 1.确定把报文发送给第几个 Next 节点,怎么解释这一步? 把报文送给哪个 Next 节点处理,是由该 Node 的业务逻辑决定的,通常有两种方式: - 一是通过解析报文的内容来决定,比如 `ethernet-input` 节点通过解析报文是 ipv4 还是 ipv6 报文把报文发送给 `ip4-input` 或 `ip6-input` 去处理; - 二是通过查找转发表来决定,比如 `ip4-lookup` 通过查找路由表来决定把报文发送给 `ip4-arp` 还是 `ip4-rewrite` 节点来处理。 2.找到该 Next 节点对应的用于存放报文的结构 vlib_frame_t(保存每个 Node 对应的要处理数据的内存地址信息),根据 Node 图的初始化流程可知,确定把报文发送给第几个 Next 节点之后,就可以获取该 Next 节点对应的 `vlib_next_frame_t` (主要用于获取node要处理的下一条数据) 结构了。`vlib_next_frame_t` 结构中的`frame_index` 字段指向 `vlib_frame_t` 结构,`vlib_frame_t` 结构是存放报文索引的地方(也就 `vlib_buffer_t` 结构索引)。 3.把报文放入 `vlib_frame_t` 结构,`vlib_frame_t` 结构最后一个字段(u8 arguments[0])为可变数组,初始化的时候会分配一块内存,用于存放报文的索引。 4.创建 `vlib_pending_frame_t` 结构,并把它加入动态数组 `vm->node_main. pending_frames` 等待调度,`vlib_pending_frame_t` 结构记录报文所在的结构 `vlib_next_frame_t` 的 索引,以及处理这些报文的 Node 的 `vlib_node_runtime_t` 结构的索引,这样通过 `vlib_pending_frame_t` 结构里面的信息就可以把报文分发给指定的 Node 处理了。 我们看一段 `test_proc` 的部分代码 ``` always_inline uword test_proc(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame, u32 is_ip4) { u16 next_node[VLIB_FRAME_SIZE]; u32 buf_out[VLIB_FRAME_SIZE]; u32 nums = 0; u32 n_left_from = 0; u32 *from = NULL; from = vlib_frame_vector_args(frame); n_left_from = frame->n_vectors; // clib_warning("n_left_from %u",n_left_from); while (n_left_from > 0) { if (is_ip4) { next_node[nums] = ipv4_statistics(vm, from[0]); } else { next_node[nums] = ipv6_statistics(vm, from[0]); } buf_out[nums] = from[0]; nums++; from += 1; n_left_from -= 1; } if (nums > 0) {//这个函数是重点,就是完成了上面描述的2-4的流程 vlib_buffer_enqueue_to_next(vm, node, buf_out, next_node, nums); } return (nums); } ``` `vlib_buffer_enqueue_to_next` 该函数里面重要的两个调用,一个是宏 `vlib_get_next_frame` ,一个是函数 `vlib_put_next_frame`;其他都是以下计算偏移代码和赋值代码,并且都支持指令集优化。 - `vlib_get_next_frame` 功能是:找到该 Next 节点对应的用于存放报文的结构 `vlib_frame_t`,返回 :`to_next` :返回当前要存放报文索引位置;`n_left_to_next`:存放 `to_next` 报文空间大小。 - 函数 `vlib_put_next_frame` ,先将报文加入数组 `vm->vlib_node_main_t.pending_frames` 等待调度处理。 ``` static_always_inline void vlib_buffer_enqueue_to_next (vlib_main_t * vm, vlib_node_runtime_t * node, u32 * buffers, u16 * nexts, uword count) { u32 *to_next, n_left_to_next, max; u16 next_index; next_index = nexts[0]; vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); max = clib_min (n_left_to_next, count); while (count) { u32 n_enqueued; if ((nexts[0] != next_index) || n_left_to_next == 0) { vlib_put_next_frame (vm, node, next_index, n_left_to_next); next_index = nexts[0]; vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next); max = clib_min (n_left_to_next, count); } //这里的宏定义时判断支持那种指令集优化,主要时针对向量指令集 #if defined(CLIB_HAVE_VEC512) u16x32 next32 = CLIB_MEM_OVERFLOW_LOAD (u16x32_load_unaligned, nexts); next32 = (next32 == u16x32_splat (next32[0])); u64 bitmap = u16x32_msb_mask (next32); n_enqueued = count_trailing_zeros (~bitmap); #elif defined(CLIB_HAVE_VEC256) u16x16 next16 = CLIB_MEM_OVERFLOW_LOAD (u16x16_load_unaligned, nexts); next16 = (next16 == u16x16_splat (next16[0])); u64 bitmap = u8x32_msb_mask ((u8x32) next16); n_enqueued = count_trailing_zeros (~bitmap) / 2; #elif defined(CLIB_HAVE_VEC128) && defined(CLIB_HAVE_VEC128_MSB_MASK) u16x8 next8 = CLIB_MEM_OVERFLOW_LOAD (u16x8_load_unaligned, nexts); next8 = (next8 == u16x8_splat (next8[0])); u64 bitmap = u8x16_msb_mask ((u8x16) next8); n_enqueued = count_trailing_zeros (~bitmap) / 2; #else u16 x = 0; if (count + 3 < max) { x |= next_index ^ nexts[1]; x |= next_index ^ nexts[2]; x |= next_index ^ nexts[3]; n_enqueued = (x == 0) ? 4 : 1; } else n_enqueued = 1; #endif if (PREDICT_FALSE (n_enqueued > max)) n_enqueued = max; #ifdef CLIB_HAVE_VEC512 if (n_enqueued >= 32) { vlib_buffer_copy_indices (to_next, buffers, 32); nexts += 32; to_next += 32; buffers += 32; n_left_to_next -= 32; count -= 32; max -= 32; continue; } #endif #ifdef CLIB_HAVE_VEC256 if (n_enqueued >= 16) { vlib_buffer_copy_indices (to_next, buffers, 16); nexts += 16; to_next += 16; buffers += 16; n_left_to_next -= 16; count -= 16; max -= 16; continue; } #endif #ifdef CLIB_HAVE_VEC128 if (n_enqueued >= 8) { vlib_buffer_copy_indices (to_next, buffers, 8); nexts += 8; to_next += 8; buffers += 8; n_left_to_next -= 8; count -= 8; max -= 8; continue; } #endif if (n_enqueued >= 4) { vlib_buffer_copy_indices (to_next, buffers, 4); nexts += 4; to_next += 4; buffers += 4; n_left_to_next -= 4; count -= 4; max -= 4; continue; } /* copy */ to_next[0] = buffers[0]; /* next */ nexts += 1; to_next += 1; buffers += 1; n_left_to_next -= 1; count -= 1; max -= 1; } vlib_put_next_frame (vm, node, next_index, n_left_to_next); } ``` ### 注册节点 VPP 提供了一套通用的注册节点方法。具体实现如下: ``` VLIB_NODE_FN(xxx_v4) (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame) { return xxx_proc(vm, node, frame, 1); } VLIB_NODE_FN(xxx_v6) (vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame) { return xxx_proc(vm, node, frame, 0); } /* *INDENT-OFF* */ #ifndef CLIB_MARCH_VARIANT VLIB_REGISTER_NODE(xxx_v4) = { .name = "hds-v4", .vector_size = sizeof(u32), .format_trace = format_hds_trace, .type = VLIB_NODE_TYPE_INTERNAL, .n_errors = ARRAY_LEN(hds_error_strings), .error_strings = xxx_error_strings, .n_next_nodes = XXX_N_NEXT, /* edit / add dispositions here */ .next_nodes = { [XXX_NEXT_DROP] = "error-drop", [XXX_NEXT_IP4_LOOKUP] = "ip4-lookup", [XXX_NEXT_IP6_LOOKUP] = "ip6-lookup", }, }; #endif /* CLIB_MARCH_VARIANT */ /* *INDENT-ON* */ /* *INDENT-OFF* */ #ifndef CLIB_MARCH_VARIANT VLIB_REGISTER_NODE(xxx_v6) = { .name = "xxx-v6", .vector_size = sizeof(u32), .format_trace = format_hds_trace, .type = VLIB_NODE_TYPE_INTERNAL, .n_errors = ARRAY_LEN(hds_error_strings), .error_strings = xxx_error_strings, .n_next_nodes = XXX_N_NEXT, /* edit / add dispositions here */ .next_nodes = { [XXX_NEXT_DROP] = "error-drop", [XXX_NEXT_IP4_LOOKUP] = "ip4-lookup", [XXX_NEXT_IP6_LOOKUP] = "ip6-lookup", }, }; #endif /* CLIB_MARCH_VARIANT */ ``` ## VPP-Feature VPP 内部业务逻辑是通过一系列的 Node 连接来实现的,这些 Node 通常在初始化时定义好,比如二层以太处理 `ethernet-input`,三层 `ip4-input` 等,通过初始化定义,将 Node 连接成一个有序的向量图,来实现 VPP 的业务功能。 而早期的 VPP 本身的 Node 框架比较固定,各个 Node 之间逻辑连接已经固定了。为此新版本增加了 Feature 机制,每个 Feature 是一个 Node,用户可以启用/停止某个或某些 Feature。用户也可以自己写插件,把自定义 Node(自己的业务逻辑)加入到指定位置。 VPP 中,将不同的 Feature 按照类型分成了不同的组,每组 Feature 称之为一个 Arc。Arc中的 Feature 按照代码指定的顺序串接起来。 Arc 结构中,记录这组 Feature 中的起始 Node 和结束 Node。系统初始化时,会完成初步的排序,但并没有应用到对应的接口中。 **初始化** `VNET_FEATURE_ARC_INIT` 宏用来注册 `vnet_feature_arc_registration_t` 类型的 Arc,将其注册到 `vnet_feature_main_t` 类型的全局变量feature_main。每个 Arc 包含一组 Feature。 源码分析举例: `src/vnet/ip/ip4_forward.c:` `VNET_FEATURE_ARC_INIT` 宏进行注册 Arc ``` /* Built-in ip4 unicast rx feature path definition */ /* *INDENT-OFF* */ VNET_FEATURE_ARC_INIT (ip4_unicast, static) = { .arc_name = "ip4-unicast", //这两个实际上在运行时只会有一个,通过配置指定运行哪一个node;一个计算ip校验和,一个不计算 .start_nodes = VNET_FEATURES ("ip4-input", "ip4-input-no-checksum"), .last_in_arc = "ip4-lookup", .arc_index_ptr = &ip4_main.lookup_main.ucast_feature_arc_index, }; ``` `VNET_FEATURE_ARC_INIT` 宏展开 ``` static vnet_feature_arc_registration_t vnet_feat_arc_ip4_unicast = { .arc_name = "ip4-unicast", .start_nodes = (char*[]) { "ip4-input", "ip4-input-no-checksum", 0}, .last_in_arc = "ip4-lookup", .arc_index_ptr = &ip4_main.lookup_main.ucast_feature_arc_index, }; /*__attribute__((constructor))在main函数之前,执行一个函数*/ static void __vnet_add_feature_arc_registration_ip4_unicast(void) __attribute__((__constructor__)); static void __vnet_add_feature_arc_registration_ip4_unicast(void) { vnet_feature_main_t *fm = &feature_main; vnet_feat_arc_ip4_unicast.next = fm->next_arc; fm->next_arc = &vnet_feat_arc_ip4_unicast; } /*__attribute__((__destructor__)) 系统在main()函数退出或者调用了exit()之后,调用的函数*/ static void __vnet_rm_feature_arc_registration_ip4_unicast(void) __attribute__((__destructor__)); static void __vnet_rm_feature_arc_registration_ip4_unicast(void) { vnet_feature_main_t *fm = &feature_main; vnet_feature_arc_registration_t *r = &vnet_feat_arc_ip4_unicast; { ASSERT(fm->next_arc); if (fm->next_arc == r) fm->next_arc = (r)->next; else { __typeof__(r) current = fm->next_arc; while (current->next) { if (current->next == r) { current->next = current->next->next; break; } current = current->next; } ASSERT(current); } } } ``` **Arc 类型:** ![image](https://cdn.nlark.com/yuque/0/2020/png/584732/1598410229041-86509955-9262-458e-bca2-39dfea52426c.png) 每种 Arc 都对应了不同的一组功能,常用的如`ip4-unicast`(处理 IPv4 报文单播)、`ip4-local`(处理目的为自己的 IPv4 报文)、`ip4-punt`(处理 IPv4 本地上送报文)等。 **添加 Feature 到指定的 Arc 里面** `VNET_FEATURE_INIT` 宏用来注册 `vnet_feature_registration_t` 类型的 Feature,将其注册到 `vnet_feature_main_t` 类型的全局变量 `feature_main`。 `VNET_FEATURE_INIT` 宏注册 Feature ``` /* *INDENT-OFF* */ VNET_FEATURE_INIT(xxx_v4, static) = { .arc_name = "ip4-unicast", .node_name = "xxx-v4", .runs_before = VNET_FEATURES("ip4-lookup"), }; ``` `VNET_FEATURE_INIT` 宏展开 ``` static vnet_feature_registration_t vnet_feat_xxx_v4 = { .arc_name = "ip4-unicast", .node_name = "xxx-v4", .runs_before = VNET_FEATURES("ip4-lookup"), }; static void __vnet_add_feature_registration_xxx_v4(void) __attribute__((__constructor__)); static void __vnet_add_feature_registration_xxx_v4(void) { vnet_feature_main_t *fm = &feature_main; vnet_feat_xxx_v4.next = fm->next_feature; fm->next_feature = &vnet_feat_xxx_v4; } static void __vnet_rm_feature_registration_xxx_v4(void) __attribute__((__destructor__)); static void __vnet_rm_feature_registration_xxx_v4(void) { vnet_feature_main_t *fm = &feature_main; vnet_feature_registration_t *r = &vnet_feat_xxx_v4; { ASSERT(fm->next_feature); if (fm->next_feature == r) fm->next_feature = (r)->next; else { __typeof__(r) current = fm->next_feature; while (current->next) { if (current->next == r) { current->next = current->next->next; break; } current = current->next; } ASSERT(current); } } } ``` `vnet_feature_enable_disable` 该函数就是将某个 Feature 开启或关闭 我们可以通过实现命令行调用这个函数来实现启动和关闭 ``` vnet_feature_enable_disable("ip4-unicast", "xxx-v4", sw_if_index, enable_disable, 0, 0); vnet_feature_enable_disable("ip6-unicast", "xxx-v6", sw_if_index, enable_disable, 0, 0); ``` `vnet_feature_arc_start` 该函数主要由 Arc 内的 start node 调用,在 start node 中通过该函数,进入到已经组织好的 Feature 的 Node 的矢量图中。 比如在 `ipv4-unicast` 这个 Arc 中,其 start node 之一是 `ip4-input`,那么在 `ip4-input` 的处理函数中,将会调用此函数进入到 Feature 的 Node 矢量图中。 ## 总结 这篇我们主要是理解 Node、Feature、Arc 的三者的关系,搞清楚这个了,我们就能灵活的添加自己的业务在 VPP 里面了。