VPP Node/Feature

node_graph.png

VPP-Node

Node 相关结构体

  • vlib_node_t 结点的主结构,包括结点的处理功能函数,名称,结点类型等,主要保存一些相对静态信息。

  • vlib_node_main_t Node graph相关的全局信息,Nodes 数组、vlib_next_frame_t、vlib_pending_frame_t等数据。

  • vlib_node_registration_t 注册 Node 结点时使用,保存结点业务逻辑的函数地址,结点类型,结点状态,结点名称等。

  • vlib_node_runtime_t 实际在调度 Node 过程中使用的结构,主要记录在处理过程中的信息变动。

  • vlib_frame_t 每个 Node 都有一个对应的vlib_frame_t,用来保存供 Node 使用的数据集合(标量、矢量)。

  • vlib_next_frame_t 主要是 Node 内部逻辑使用,用于定位该 Node 的下一结点所对应的 Frame 地址。

  • vlib_pending_frame_t 当一个 Node 处理完数据包,则填充该待处理帧管理表数据结,调度框架便能在下一次调度时找到需要接收该数据包的下一个Node。

Node 类型

VLIB_NODE_TYPE_INTERN : 表示处理报文的业务 Node,可在 main/worker 线程运行。
VLIB_NODE_TYPE_INPUT  : 表示为 INTERNAL 类型 Node 提供报文;收报逻辑 Node,比如 dpdk-input,可在 main/worker 线程运行。
VLIB_NODE_TYPE_PRE_INPUT : 主要是为 PROCESS 类型的 Node 提供数据,目前只有一个 epoll node,对 socket 相关逻辑提供服务,主要使用在控制业务上面,可在 main 线程运行。
VLIB_NODE_TYPE_PROCESS : 线程节点,和线程一样,可以暂停、等待、恢复时间,不同于 pthread_create,他是基于setjump/longjump实现的协程,可在 main 线程运行。

setJump 和 longJump 简单说明 ,详情参考https://www.jianshu.com/p/67592ec248f1

#include <setjump.h>
int setjump(jmp_buf env);
int sigsetjump(sigjmp_buf env,int savesigs);

非局部跳转语句— setjmp 和 longjmp 函数。非局部指的是,这不是由普通 C 语言 goto 语句在一个函数内实施的跳转,而是在栈上跳过若干调用帧,返回到当前函数调用路径上的某一个函数中。

调度方式

共分为三种状态:

VLIB_NODE_STATE_POLLING : 轮询模式
VLIB_NODE_STATE_INTERRUPT : 中断模式
VLIB_NODE_STATE_DISABLED : 如果被设置了该状态,则不会被调用

从代码上来看默认全是轮询方式。

      /* Next process input nodes. */
      vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
    cpu_time_now = dispatch_node (vm, n,
                      VLIB_NODE_TYPE_INPUT,
                      VLIB_NODE_STATE_POLLING,
                      /* frame */ 0,
                      cpu_time_now);

///////////////////////////////////////////////////////

  last_time_stamp = dispatch_node (vm, n,
                   VLIB_NODE_TYPE_INTERNAL,
                   VLIB_NODE_STATE_POLLING,
                   f, last_time_stamp);

中断和轮询之间的切换:

  • 模式切换依据矢量速率(累计报文数量),在vlib_main_or_worker_loop启动中设置。

  • 当处于中断模式且矢量速率超过阈值时,切换至轮询模式。

  • 当处于轮询模式且矢量速率低于阈值时,切换至中断模式。

这里通俗理解一下,意思就是流量大,报文密集的时候,使用轮询模式;流量小的时候,就切换到中断模式,减少cpu压力。

这个切换的原理想简单一点,还是比较容易理解的,如果当前模式为轮询状态的时候,流量突然变小了,这样就会出现cpu空转,可能转了几次才收到2、3个报文,甚至1个;这样就太浪费资源了,就切换到中断模式。

反过来想,如果当前模式是中断模式的时候,流量突然变大,cpu发生中断的那一刻,累计了报文多了,一次vector packet太多,超过阈值就要切换过来,这样就提高性能了,否则就处理不过来了。

这个说法虽然很粗糙,但是理解起来比较容易。

Node Graph相关结构体

  • vlib_node_main_t

    节点图主结构,记录全局节点图的数据信息。

  • vlib_node_t

    单个节点的主结构,包括节点的处理功能函数,名称,节点类型等,主要保存一些相对静态信息。

  • vlib_node_registration_t

    注册 Node 节点时使用,保存节点业务逻辑的函数地址,节点类型,节点状态,节点名称等。

  • vlib_node_runtime_t

    实际在调度 Node 过程中使用的结构,主要记录在处理过程中的信息变动。

  • vlib_frame_t

    保存每个 Node 对应的要处理的数据的内存地址信息。

  • vlib_pending_frame_t

    记录运行节点的索引,数据包索引,以及下一个数据包的索引。

  • vlib_next_frame_t

    主要用于获取 Node 要处理的下一条数据。

VLIB_NODE_TYPE_INPUT节点类型

INPUT类型节点一般运行在 worker 线程,也可以配置在 main 线程里面运行。

调用流程在 vlib_main_or_worker_loop 函数的 while(1) 循环里面;简单去想,就是收报文用的。

/* Next process input nodes. */
vec_foreach (n, nm->nodes_by_type[VLIB_NODE_TYPE_INPUT])
cpu_time_now = dispatch_node (vm, n,
               VLIB_NODE_TYPE_INPUT,
               VLIB_NODE_STATE_POLLING,
               /* frame */ 0,
               cpu_time_now);

VLIB_NODE_TYPE_INTERNAL 节点类型

简单的说就是处理报文的。

这个我们可以通过分析 INPUT 类型的 Node 是如何把报文发送给 INTERNAL 类型的 Node 的整个过程来理解,具体步骤分为以下几步:

1.确定把报文发送给第几个 Next 节点,怎么解释这一步?

把报文送给哪个 Next 节点处理,是由该 Node 的业务逻辑决定的,通常有两种方式:

  • 一是通过解析报文的内容来决定,比如 ethernet-input 节点通过解析报文是 ipv4 还是 ipv6 报文把报文发送给 ip4-inputip6-input 去处理;

  • 二是通过查找转发表来决定,比如 ip4-lookup 通过查找路由表来决定把报文发送给 ip4-arp 还是 ip4-rewrite 节点来处理。

2.找到该 Next 节点对应的用于存放报文的结构 vlib_frame_t(保存每个 Node 对应的要处理数据的内存地址信息),根据 Node 图的初始化流程可知,确定把报文发送给第几个 Next 节点之后,就可以获取该 Next 节点对应的 vlib_next_frame_t (主要用于获取node要处理的下一条数据) 结构了。vlib_next_frame_t 结构中的frame_index 字段指向 vlib_frame_t 结构,vlib_frame_t 结构是存放报文索引的地方(也就 vlib_buffer_t 结构索引)。

3.把报文放入 vlib_frame_t 结构,vlib_frame_t 结构最后一个字段(u8 arguments[0])为可变数组,初始化的时候会分配一块内存,用于存放报文的索引。

4.创建 vlib_pending_frame_t 结构,并把它加入动态数组 vm->node_main. pending_frames 等待调度,vlib_pending_frame_t 结构记录报文所在的结构 vlib_next_frame_t 的 索引,以及处理这些报文的 Node 的 vlib_node_runtime_t 结构的索引,这样通过 vlib_pending_frame_t 结构里面的信息就可以把报文分发给指定的 Node 处理了。

我们看一段 test_proc 的部分代码

always_inline uword test_proc(vlib_main_t *vm, vlib_node_runtime_t *node,
                             vlib_frame_t *frame, u32 is_ip4)
{
    u16 next_node[VLIB_FRAME_SIZE];
    u32 buf_out[VLIB_FRAME_SIZE];
    u32 nums = 0;
    u32 n_left_from = 0;
    u32 *from = NULL;
    from = vlib_frame_vector_args(frame);
    n_left_from = frame->n_vectors;
    //    clib_warning("n_left_from %u",n_left_from);
    while (n_left_from > 0)
    {
        if (is_ip4)
        {
            next_node[nums] = ipv4_statistics(vm, from[0]);
        }
        else
        {
            next_node[nums] = ipv6_statistics(vm, from[0]);
        }
        buf_out[nums] = from[0];
        nums++;
        from += 1;
        n_left_from -= 1;
    }
    if (nums > 0)
    {//这个函数是重点,就是完成了上面描述的2-4的流程
        vlib_buffer_enqueue_to_next(vm, node, buf_out, next_node, nums);
    }
    return (nums);
}

vlib_buffer_enqueue_to_next 该函数里面重要的两个调用,一个是宏 vlib_get_next_frame ,一个是函数 vlib_put_next_frame;其他都是以下计算偏移代码和赋值代码,并且都支持指令集优化。

  • vlib_get_next_frame 功能是:找到该 Next 节点对应的用于存放报文的结构 vlib_frame_t,返回 :to_next :返回当前要存放报文索引位置;n_left_to_next:存放 to_next 报文空间大小。

  • 函数 vlib_put_next_frame ,先将报文加入数组 vm->vlib_node_main_t.pending_frames 等待调度处理。

static_always_inline void
vlib_buffer_enqueue_to_next (vlib_main_t * vm, vlib_node_runtime_t * node,
                 u32 * buffers, u16 * nexts, uword count)
{
  u32 *to_next, n_left_to_next, max;
  u16 next_index;

  next_index = nexts[0];
  vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
  max = clib_min (n_left_to_next, count);

  while (count)
    {
      u32 n_enqueued;
      if ((nexts[0] != next_index) || n_left_to_next == 0)
    {
      vlib_put_next_frame (vm, node, next_index, n_left_to_next);
      next_index = nexts[0];
      vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);
      max = clib_min (n_left_to_next, count);
    }
     //这里的宏定义时判断支持那种指令集优化,主要时针对向量指令集
#if defined(CLIB_HAVE_VEC512)
      u16x32 next32 = CLIB_MEM_OVERFLOW_LOAD (u16x32_load_unaligned, nexts);
      next32 = (next32 == u16x32_splat (next32[0]));
      u64 bitmap = u16x32_msb_mask (next32);
      n_enqueued = count_trailing_zeros (~bitmap);
#elif defined(CLIB_HAVE_VEC256)
      u16x16 next16 = CLIB_MEM_OVERFLOW_LOAD (u16x16_load_unaligned, nexts);
      next16 = (next16 == u16x16_splat (next16[0]));
      u64 bitmap = u8x32_msb_mask ((u8x32) next16);
      n_enqueued = count_trailing_zeros (~bitmap) / 2;
#elif defined(CLIB_HAVE_VEC128) && defined(CLIB_HAVE_VEC128_MSB_MASK)
      u16x8 next8 = CLIB_MEM_OVERFLOW_LOAD (u16x8_load_unaligned, nexts);
      next8 = (next8 == u16x8_splat (next8[0]));
      u64 bitmap = u8x16_msb_mask ((u8x16) next8);
      n_enqueued = count_trailing_zeros (~bitmap) / 2;
#else
      u16 x = 0;
      if (count + 3 < max)
    {
      x |= next_index ^ nexts[1];
      x |= next_index ^ nexts[2];
      x |= next_index ^ nexts[3];
      n_enqueued = (x == 0) ? 4 : 1;
    }
      else
    n_enqueued = 1;
#endif

      if (PREDICT_FALSE (n_enqueued > max))
    n_enqueued = max;

#ifdef CLIB_HAVE_VEC512
      if (n_enqueued >= 32)
    {
      vlib_buffer_copy_indices (to_next, buffers, 32);
      nexts += 32;
      to_next += 32;
      buffers += 32;
      n_left_to_next -= 32;
      count -= 32;
      max -= 32;
      continue;
    }
#endif

#ifdef CLIB_HAVE_VEC256
      if (n_enqueued >= 16)
    {
      vlib_buffer_copy_indices (to_next, buffers, 16);
      nexts += 16;
      to_next += 16;
      buffers += 16;
      n_left_to_next -= 16;
      count -= 16;
      max -= 16;
      continue;
    }
#endif

#ifdef CLIB_HAVE_VEC128
      if (n_enqueued >= 8)
    {
      vlib_buffer_copy_indices (to_next, buffers, 8);
      nexts += 8;
      to_next += 8;
      buffers += 8;
      n_left_to_next -= 8;
      count -= 8;
      max -= 8;
      continue;
    }
#endif

      if (n_enqueued >= 4)
    {
      vlib_buffer_copy_indices (to_next, buffers, 4);
      nexts += 4;
      to_next += 4;
      buffers += 4;
      n_left_to_next -= 4;
      count -= 4;
      max -= 4;
      continue;
    }

      /* copy */
      to_next[0] = buffers[0];

      /* next */
      nexts += 1;
      to_next += 1;
      buffers += 1;
      n_left_to_next -= 1;
      count -= 1;
      max -= 1;
    }
  vlib_put_next_frame (vm, node, next_index, n_left_to_next);
}

注册节点

VPP 提供了一套通用的注册节点方法。具体实现如下:

VLIB_NODE_FN(xxx_v4)
(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
{
    return xxx_proc(vm, node, frame, 1);
}

VLIB_NODE_FN(xxx_v6)
(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *frame)
{
    return xxx_proc(vm, node, frame, 0);
}

/* *INDENT-OFF* */
#ifndef CLIB_MARCH_VARIANT
VLIB_REGISTER_NODE(xxx_v4) = {
    .name = "hds-v4",
    .vector_size = sizeof(u32),
    .format_trace = format_hds_trace,
    .type = VLIB_NODE_TYPE_INTERNAL,

    .n_errors = ARRAY_LEN(hds_error_strings),
    .error_strings = xxx_error_strings,

    .n_next_nodes = XXX_N_NEXT,

    /* edit / add dispositions here */
    .next_nodes =
        {
            [XXX_NEXT_DROP] = "error-drop",
            [XXX_NEXT_IP4_LOOKUP] = "ip4-lookup",
            [XXX_NEXT_IP6_LOOKUP] = "ip6-lookup",
        },
};
#endif /* CLIB_MARCH_VARIANT */
/* *INDENT-ON* */

/* *INDENT-OFF* */
#ifndef CLIB_MARCH_VARIANT
VLIB_REGISTER_NODE(xxx_v6) = {
    .name = "xxx-v6",
    .vector_size = sizeof(u32),
    .format_trace = format_hds_trace,
    .type = VLIB_NODE_TYPE_INTERNAL,

    .n_errors = ARRAY_LEN(hds_error_strings),
    .error_strings = xxx_error_strings,

    .n_next_nodes = XXX_N_NEXT,

    /* edit / add dispositions here */
    .next_nodes =
        {
            [XXX_NEXT_DROP] = "error-drop",
            [XXX_NEXT_IP4_LOOKUP] = "ip4-lookup",
            [XXX_NEXT_IP6_LOOKUP] = "ip6-lookup",
        },
};
#endif /* CLIB_MARCH_VARIANT */

VPP-Feature

VPP 内部业务逻辑是通过一系列的 Node 连接来实现的,这些 Node 通常在初始化时定义好,比如二层以太处理 ethernet-input,三层 ip4-input 等,通过初始化定义,将 Node 连接成一个有序的向量图,来实现 VPP 的业务功能。

而早期的 VPP 本身的 Node 框架比较固定,各个 Node 之间逻辑连接已经固定了。为此新版本增加了 Feature 机制,每个 Feature 是一个 Node,用户可以启用/停止某个或某些 Feature。用户也可以自己写插件,把自定义 Node(自己的业务逻辑)加入到指定位置。

VPP 中,将不同的 Feature 按照类型分成了不同的组,每组 Feature 称之为一个 Arc。Arc中的 Feature 按照代码指定的顺序串接起来。 Arc 结构中,记录这组 Feature 中的起始 Node 和结束 Node。系统初始化时,会完成初步的排序,但并没有应用到对应的接口中。

初始化

VNET_FEATURE_ARC_INIT 宏用来注册 vnet_feature_arc_registration_t 类型的 Arc,将其注册到 vnet_feature_main_t 类型的全局变量feature_main。每个 Arc 包含一组 Feature。

源码分析举例:

src/vnet/ip/ip4_forward.c:

VNET_FEATURE_ARC_INIT 宏进行注册 Arc

/* Built-in ip4 unicast rx feature path definition */
/* *INDENT-OFF* */
VNET_FEATURE_ARC_INIT (ip4_unicast, static) =
{
  .arc_name = "ip4-unicast",
    //这两个实际上在运行时只会有一个,通过配置指定运行哪一个node;一个计算ip校验和,一个不计算
  .start_nodes = VNET_FEATURES ("ip4-input", "ip4-input-no-checksum"),
  .last_in_arc = "ip4-lookup",
  .arc_index_ptr = &ip4_main.lookup_main.ucast_feature_arc_index,
};

VNET_FEATURE_ARC_INIT 宏展开

static vnet_feature_arc_registration_t vnet_feat_arc_ip4_unicast = {
  .arc_name = "ip4-unicast",
  .start_nodes = (char*[]) { "ip4-input", "ip4-input-no-checksum", 0},
  .last_in_arc = "ip4-lookup",
  .arc_index_ptr = &ip4_main.lookup_main.ucast_feature_arc_index,
};
/*__attribute__((constructor))在main函数之前,执行一个函数*/
static void __vnet_add_feature_arc_registration_ip4_unicast(void) __attribute__((__constructor__));
static void __vnet_add_feature_arc_registration_ip4_unicast(void)
{
    vnet_feature_main_t *fm = &feature_main;
    vnet_feat_arc_ip4_unicast.next = fm->next_arc;
    fm->next_arc = &vnet_feat_arc_ip4_unicast;
}
/*__attribute__((__destructor__)) 系统在main()函数退出或者调用了exit()之后,调用的函数*/
static void __vnet_rm_feature_arc_registration_ip4_unicast(void) __attribute__((__destructor__));
static void __vnet_rm_feature_arc_registration_ip4_unicast(void)
{
    vnet_feature_main_t *fm = &feature_main;
    vnet_feature_arc_registration_t *r = &vnet_feat_arc_ip4_unicast;
    {
        ASSERT(fm->next_arc);
        if (fm->next_arc == r)
            fm->next_arc = (r)->next;
        else
        {
            __typeof__(r) current = fm->next_arc;
            while (current->next)
            {
                if (current->next == r)
                {
                    current->next = current->next->next;
                    break;
                }
                current = current->next;
            }
            ASSERT(current);
        }
    }
}

Arc 类型:

image

每种 Arc 都对应了不同的一组功能,常用的如ip4-unicast(处理 IPv4 报文单播)、ip4-local(处理目的为自己的 IPv4 报文)、ip4-punt(处理 IPv4 本地上送报文)等。

添加 Feature 到指定的 Arc 里面

VNET_FEATURE_INIT 宏用来注册 vnet_feature_registration_t 类型的 Feature,将其注册到 vnet_feature_main_t 类型的全局变量 feature_main

VNET_FEATURE_INIT 宏注册 Feature

/* *INDENT-OFF* */
VNET_FEATURE_INIT(xxx_v4, static) =
    {
        .arc_name = "ip4-unicast",
        .node_name = "xxx-v4",
        .runs_before = VNET_FEATURES("ip4-lookup"),
};

VNET_FEATURE_INIT 宏展开

static vnet_feature_registration_t vnet_feat_xxx_v4 =
    {
        .arc_name = "ip4-unicast",
        .node_name = "xxx-v4",
        .runs_before = VNET_FEATURES("ip4-lookup"),
};
static void __vnet_add_feature_registration_xxx_v4(void) __attribute__((__constructor__));
static void __vnet_add_feature_registration_xxx_v4(void)
{
    vnet_feature_main_t *fm = &feature_main;
    vnet_feat_xxx_v4.next = fm->next_feature;
    fm->next_feature = &vnet_feat_xxx_v4;
}
static void __vnet_rm_feature_registration_xxx_v4(void) __attribute__((__destructor__));
static void __vnet_rm_feature_registration_xxx_v4(void)
{
    vnet_feature_main_t *fm = &feature_main;
    vnet_feature_registration_t *r = &vnet_feat_xxx_v4;
    {
        ASSERT(fm->next_feature);
        if (fm->next_feature == r)
            fm->next_feature = (r)->next;
        else
        {
            __typeof__(r) current = fm->next_feature;
            while (current->next)
            {
                if (current->next == r)
                {
                    current->next = current->next->next;
                    break;
                }
                current = current->next;
            }
            ASSERT(current);
        }
    }
}

vnet_feature_enable_disable

该函数就是将某个 Feature 开启或关闭

我们可以通过实现命令行调用这个函数来实现启动和关闭

vnet_feature_enable_disable("ip4-unicast", "xxx-v4", sw_if_index, enable_disable, 0, 0);
vnet_feature_enable_disable("ip6-unicast", "xxx-v6", sw_if_index, enable_disable, 0, 0);

vnet_feature_arc_start

该函数主要由 Arc 内的 start node 调用,在 start node 中通过该函数,进入到已经组织好的 Feature 的 Node 的矢量图中。

比如在 ipv4-unicast 这个 Arc 中,其 start node 之一是 ip4-input,那么在 ip4-input 的处理函数中,将会调用此函数进入到 Feature 的 Node 矢量图中。

总结

这篇我们主要是理解 Node、Feature、Arc 的三者的关系,搞清楚这个了,我们就能灵活的添加自己的业务在 VPP 里面了。