pdr代码分析

vpp的node框架

vpp通过node的级联组成graph,实现报文的流转处理,node就是处理报文的最小逻辑单元,一个node就代表这一个处理报文的逻辑,node的初始注册代码如下:

/* 注册node处理函数宏,upf_pdr_detect_node必须通过注册node宏来注册,处理函数为upf_pdr_detect */
VLIB_NODE_FN (upf_pdr_detect_node)
(vlib_main_t *vm, vlib_node_runtime_t *node, vlib_frame_t *from_frame)
{
    /* vlib_main_t:记录全局信息,比如一些统计数据,node graph,命令行接口,注册的functions,是整个vpp的入口 */
    /* vlib_node_runtime_t:实际在调用node过程中使用的结构,主要记录在处理过程中的信息变动 */
    /* vlib_frame_t:保存每个node对应的要处理的数据的内存地址信息,报文存放到vlib_frame_t结构体后,该结构体的最后一个字段(u8 arguments[0])为可变数组,初始化的时候会分配一块内存,用于存放报文的索引 */
    return upf_pdr_detect (vm, node, from_frame, /* is_ip4 */ 1);
}

/* 注册node宏 */
VLIB_REGISTER_NODE (upf_pdr_detect_node) = {
    .name = "upf-pdr-detect", /* name必须唯一 */
    .vector_size = sizeof (u32),
    .type = VLIB_NODE_TYPE_INTERNAL, /* 节点类型 */
    /* 每一个插件在vpp里面有不同的node构成,每一个node主要分为以下四种类型:
    VLIB_NODE_TYPE_INTERNAL:内部节点,最典型的节点接收缓冲向量,执行操作。vpp大部分节点是这个角色, 主要对数据流做内部处理。
    VLIB_NODE_TYPE_INPUT:输入节点,通常是设备输入节点。从零开始创建框架并分派到内部节点(internal),input节点收包模式分为轮询和中断两种模式。
    VLIB_NODE_TYPE_PRE_INPUT:输入节点前处理的节点,暂时在vpp里面没用用到。
    VLIB_NODE_TYPE_PROCESS:线程节点,和线程一样,可以可以暂停、等待事件、恢复,不同于pthread_thread,他是基于setjump/longjump实现的线程。*/
    .format_trace = format_get_pdrinfo, /* 此节点的跟踪格式 */
    .n_errors = PDR_DETECT_N_ERROR,
    .error_strings = pdr_detect_error_strings,
    .n_next_nodes = PD_NEXT_N_NEXT, /* 下挂了多少可调度的node */
    /* 具体的下挂节点 */
    .next_nodes = {
        [PD_NEXT_DROP] = "error-drop",
        [PD_NEXT_PROCESS] = "upf-ip4-process",
    }};

node中报文处理流程

vpp的每一个插件internal节点报文处理流程大致相同,下面函数省去了具体的业务处理,只暴露了报文处理框架:

static uword
upf_pdr_detect (vlib_main_t *vm, vlib_node_runtime_t *node,
                vlib_frame_t *frame, int is_ip4)
{
  u32 n_left_from, *from, next_index, *to_next, n_left_to_next;

  /* 本节点收到的vector包的起始地址 */
  from = vlib_frame_vector_args (frame);

    /* 本节点收到的vector包数 */
  n_left_from = frame->n_vectors;

  /* cache_next_index记录着上一次经过该节点时的next_index,next_index对应着VLIB_REGISTER_NODE(upf_pdr_detect_node).next_nodes中下一节点的索引 */
  next_index = node->cached_next_index;

  while (n_left_from > 0)
    {
      /* to_next:next_index所指下一个节点的收包缓存的空闲位置首地址 */
      /* n_left_to_next:下一个节点收包缓存的空闲位置数 */
      vlib_get_next_frame (vm, node, next_index, to_next, n_left_to_next);

      /* 一次性处理一个包 */
      while (n_left_from > 0 && n_left_to_next > 0)
        {
          u32 bi0;

          /* next0指明包的下一个节点索引值 */
          u32 next0 = PD_NEXT_DROP;

          vlib_buffer_t *b0;

          /* from[0]中保存的是本节点收到包的包索引值,
             这里直接把from[0]放到to_next[0]里面了,
             这里的意思是假设直接把报文放到next_index对应下一个节点的收包
             缓存里面了,后面vlib_validate_buffer_enqueue_x1宏会对其做调整 */
          bi0 = to_next[0] = from[0];

          /* 根据buf index从当前node里面拿到对应的vlib_buffer_t */
          b0 = vlib_get_buffer (vm, bi0);

        /* 处理该noded记录放到trace里面 */
        trace:
          if (PREDICT_FALSE (b0->flags & VLIB_BUFFER_IS_TRACED))
            {
              upf_pdr_trace_t *tr =
                  vlib_add_trace (vm, node, b0, sizeof (*tr));
              tr->sx_idx = upf_buffer_opaque (b0)->upf.session_index;
              tr->pdr_idx = upf_buffer_opaque (b0)->upf.pdr_index;
              tr->next = next0;
            }

          /* 偏移from和to_next指针的位置,并减少n_left_from和n_left_to_next
             这里n_left_from表示当前节点收到的报文总数
             而n_left_to_next表示下一个节点收包缓存队列的最大数量 */
          from++;
          to_next++;
          n_left_from--;
          n_left_to_next--;

          /*
          next_index:默认的下一结点的index
          next0:实际的下一个结点的index
          next0 == next_index则不需要做特别的处理,报文会自动进入下一个节点
          next0 != next_index则需要对该数据包做调整,从之前next_index对应
                           的frame中删除,添加到next0对应的frame中
          */
          vlib_validate_buffer_enqueue_x1 (vm, node, next_index, to_next,
                                           n_left_to_next, bi0, next0);
        }
      /*
      所有流程都正确处理完毕后,下一节点的frame上已经有本节点处理过后的数据索引
      执行该函数,将相关信息登记到vlib_pending_frame_t中,准备开始调度处理
      */
      vlib_put_next_frame (vm, node, next_index, n_left_to_next);
    }

  /* 处理完所有的node后进行统计计数 */
  if (is_ip4)
    {
      vlib_node_increment_counter (vm, upf_pdr_detect_node.index,
                                   PDR_DETECT_ERROR_COUNTER, pkts_counter);
    }
  else
    {
      vlib_node_increment_counter (vm, upf_pdr_detect6_node.index,
                                   PDR_DETECT_ERROR_COUNTER, pkts_counter);
    }

  return frame->n_vectors;
}

pdr核心业务流程

pdr

  1. 检测报文是否VNET_BUFFER_F_LOCALLY_ORIGINATED标志位,若有则只走线性pdr,具体代码如下:

              /* 标识upf自己发出的报文或者dn发过来的报文 */
              if (PREDICT_FALSE (b0->flags & VNET_BUFFER_F_LOCALLY_ORIGINATED))
                {
                  /* 存储空间清0来存放upf_buffer_opaque_t的数据 */
                  clib_memset (upf_buffer_opaque (b0), 0,
                               sizeof (upf_buffer_opaque_t));
    
                  /* 保存已获取的session_index */
                  upf_buffer_opaque (b0)->upf.session_index =
                      vnet_buffer (b0)->ip.adj_index[VLIB_TX];
    
                  upf_buffer_opaque (b0)->upf.pdr_index = ~0;
                  upf_buffer_opaque (b0)->upf.flow_index = ~0;
    
                  /* 标识src_intf为SRC_INTF_CORE */
                  upf_buffer_opaque (b0)->upf.src_intf = SRC_INTF_CORE;
    
                  upf_buffer_opaque (b0)->upf.data_offset = 0;
                  upf_debug ("this packet from local %p", b0);
    
                  /* 若有tcp检验和标志则进行校验和操作 */
                  if (b0->flags & VNET_BUFFER_F_OFFLOAD_TCP_CKSUM)
                    {
                      vnet_calc_checksums_inline (
                          vm, b0, b0->flags & VNET_BUFFER_F_IS_IP4,
                          b0->flags & VNET_BUFFER_F_IS_IP6);
                    }
    
                  /* 获取session_index */
                  sidx = upf_buffer_opaque (b0)->upf.session_index;
    
                  /* 根据session_index获取存储在双链表中的sess节点 */
                  sess = pool_elt_at_index (gtm->sessions, sidx);
    
                  /* 默认去查SX_ACTIVE,为smf下发的rules,SX_PENDING为自己配置的rules */
                  active = sx_get_rules (sess, SX_ACTIVE);
    
                  /* 性能模式暂未使用 */
                  if (upf_main.pdr_search_perf)
                    {
                      u32 is_eth = 0;
                      u8 *data = (u8 *)(vlib_buffer_get_current (b0) +
                                        upf_buffer_opaque (b0)->upf.data_offset);
                      if (sess->pdn_type == PDN_TYPE_ETHERNET)
                        {
                          is_eth = 1;
                          data += sizeof (ethernet_header_t);
                        }
                      if (is_v6_packet (data))
                        pdr_idx = upf_search_pdr6 (active->acl6, b0, is_eth);
                      else
                        pdr_idx = upf_search_pdr (active->acl, b0, is_eth);
                    }
                  else
                    /* 默认去做线性查找 */
                    pdr_idx = search_pdr (active, b0);
    
                  if (pdr_idx != ~0)
                    {
                      /* 若命中pdr,保存后指定node去走下一个流程 */
                      upf_buffer_opaque (b0)->upf.pdr_index = pdr_idx;
                      next0 = PD_NEXT_PROCESS;
                    }
                  else
                    {
                      upf_debug ("can not find the pdr\n");
                      next0 = PD_NEXT_DROP;
                    }
                  /* 因为这种类型的报文不需去创建flowcache表项,所以这里要取消表项的学习 */
                  flowcache_abort_learning (b0);
                  goto trace;
                }
    
  2. 其它类型的报文先去做获取session和获取rules的基本操作,代码如下:

              /* 未找到session_index */
              if (PREDICT_FALSE (upf_buffer_opaque (b0)->upf.session_index == ~0))
                {
                  goto trace;
                }
    
              /* 判断是否为ipv4的报文 */
              u32 is_payload_v4 =
                  is_v4_packet ((u8 *)(vlib_buffer_get_current (b0) +
                                       upf_buffer_opaque (b0)->upf.data_offset));
    
              sidx = upf_buffer_opaque (b0)->upf.session_index;
    
              /* 根据sidx去取这个session */
              if (PREDICT_FALSE (pool_is_free_index (gtm->sessions, sidx)))
                goto trace;
              sess = pool_elt_at_index (gtm->sessions, sidx);
    
              /* 拿到session对应的rules */
              active = sx_get_rules (sess, SX_ACTIVE);
    
              /* 若session有SX_DELETING标志位,则此线程不再做处理 */
              if (PREDICT_FALSE (sess->flags & SX_DELETING))
                {
                  goto trace;
                }
    
  3. 若未命中flowtable,还去走线性匹配,代码如下:

              if (pool_is_free_index (fm->flows,
                                      upf_buffer_opaque (b0)->upf.flow_index))
                {
                  /* 性能模式暂未使用 */
                  if (upf_main.pdr_search_perf)
                    {
                      u32 is_eth = 0;
                      u8 *data = (u8 *)(vlib_buffer_get_current (b0) +
                                        upf_buffer_opaque (b0)->upf.data_offset);
                      if (sess->pdn_type == PDN_TYPE_ETHERNET)
                        {
                          is_eth = 1;
                          data += sizeof (ethernet_header_t);
                        }
                      if (is_v6_packet (data))
                        pdr_idx = upf_search_pdr6 (active->acl6, b0, is_eth);
                      else
                        pdr_idx = upf_search_pdr (active->acl, b0, is_eth);
                    }
                  else
                    pdr_idx = search_pdr (active, b0);
    
                  if (pdr_idx == ~0)
                    {
                      upf_debug ("can not find the pdr\n");
                      next0 = PD_NEXT_DROP;
                    }
                  else
                    {
                      upf_buffer_opaque (b0)->upf.pdr_index = pdr_idx;
                      /* 若有dns报文,则根据域名去匹配更高优先级的pdr,看能否命中 */
                      if (upf_pdr_is_dns (b0, active, is_payload_v4))
                        upf_pdr_detect_application_detection_dns (vm, b0, active,
                                                                  is_payload_v4);
                      next0 = PD_NEXT_PROCESS;
                      pkts_counter++;
                    }
                  goto trace;
                }
    
  4. 若命中了flowtable,去确认这个包是否要转发,代码如下:

              /* 获取flowtable表 */
              flow = pool_elt_at_index (fm->flows,
                                        upf_buffer_opaque (b0)->upf.flow_index);
              ASSERT (flow != NULL);
    
              /* 判断这个包是否要去转发 */
              is_reverse = upf_buffer_opaque (b0)->upf.is_reverse;
              is_forward = (is_reverse == flow->is_reverse) ? 1 : 0;
    
              /* 去查找flow里面的pdr规则 */
              upf_buffer_opaque (b0)->upf.pdr_index = flow->pdr_index[is_reverse];
    
              upf_debug ("is_rev %u, is_fwd %d, flow id: %u, pdr_idx: %u, "
                         "flowcache_flag:%x\n",
                         is_reverse, is_forward,
                         upf_buffer_opaque (b0)->upf.flow_index,
                         upf_buffer_opaque (b0)->upf.pdr_index,
                         upf_buffer_opaque (b0)->upf.flowcache_flag);
    
              /* reassemble的处理没有用到 */
              if (sess->pdn_type != PDN_TYPE_ETHERNET &&
                  PREDICT_FALSE (vnet_buffer (b0)->ip.reass.next_index) != 0)
                {
                  /* comes from reassemble */
                  flowcache_abort_learning (b0);
                  vlib_buffer_advance (b0,
                                       -upf_buffer_opaque (b0)->upf.data_offset);
                }
    
  5. 根据判断条件,分情况处理,代码如下:

              /* 没有获取到有效的pdr或者flowtable表项需要更新,还去走线性匹配 */
              if ((upf_buffer_opaque (b0)->upf.pdr_index == ~0) ||
                  (upf_check_flowtable_update_timestamp (flow, current_time)))
                {
                  if (upf_main.pdr_search_perf)
                    {
                      u32 is_eth = 0;
                      u8 *data = (u8 *)(vlib_buffer_get_current (b0) +
                                        upf_buffer_opaque (b0)->upf.data_offset);
                      if (sess->pdn_type == PDN_TYPE_ETHERNET)
                        {
                          is_eth = 1;
                          data += sizeof (ethernet_header_t);
                        }
                      if (is_v6_packet (data))
                        pdr_idx = upf_search_pdr6 (active->acl6, b0, is_eth);
                      else
                        pdr_idx = upf_search_pdr (active->acl, b0, is_eth);
                    }
                  else
                    pdr_idx = search_pdr (active, b0);
    
                  if (pdr_idx == ~0)
                    {
                      upf_debug ("can not find the pdr\n");
                      next0 = PD_NEXT_DROP;
                    }
                  else
                    {
                      upf_debug ("hit the pdr idx %d\n", pdr_idx);
                      upf_buffer_opaque (b0)->upf.pdr_index = pdr_idx;
                      if (upf_pdr_is_dns (b0, active, is_payload_v4))
                        upf_pdr_detect_application_detection_dns (vm, b0, active,
                                                                  is_payload_v4);
                      pkts_counter++;
                    }
                }
              /* 报文需要转发并且报文为http request或者报文已被ndpi模块进行了协议识别 */
              else if (is_forward &&
                       (upf_pdr_is_request (vm, b0, flow, active, is_payload_v4) ||
                        vnet_buffer2 (b0)->__unused2[0] ||
                        /* upf_inflate_hd函数暂未使用到,里面固定返回为NULL */
                        ((headers = upf_inflate_hd (flow, b0)) != NULL)))
                {
                  upf_debug ("Forward Flow app \n");
    
                  upf_pdr_detect_application_detection (
                      vm, b0, flow, active, is_payload_v4,
                      &flowcache_abort_learning_flag, headers);
                  if (headers)
                    vec_free (headers);
    
                  /* 若报文为https的client_hello包,则再进行一次pdr匹配,去命中更精细的业务 */
                  if (upf_buffer_opaque (b0)->upf.https_client_hello)
                  {
                    upf_pdr_detect_application_detection_sni (vm, b0, flow, active,
                      is_payload_v4, &flowcache_abort_learning_flag);
                  }
                }
              /* 报文不需要转发则去获取这个pdr */
              else if (!is_forward && flow->application_id != ~0)
                {
                  upf_debug ("Reverse Flow and Appid_idx %u\n",
                             flow->application_id);
                  upf_pdr_detect_get_application_rule (
                      vm, b0, flow, active, is_payload_v4,
                      &flowcache_abort_learning_flag);
                }
              /* 4k报文之后去建立flowtable表项,之后的报文不再经过这个流程,去查flowtable做快速转发处理 */
              else if (flow->stats[0].bytes > 4096 && flow->stats[1].bytes > 4096)
                {
                  /* stop flow classification after 4k in each direction */
                  upf_debug ("####Stopping PDR detect after 4k in each direction "
                             "for flow id: 0x%x",
                             upf_buffer_opaque (b0)->upf.flow_index);
                  flowcache_abort_learning_flag = 0;
                }
    
  6. 添加进flowtable表项,代码如下:

              /* 若flowcache_abort_learning_flag未被置0,取消表项的学习 */
              if (1 == flowcache_abort_learning_flag)
                flowcache_abort_learning (b0);
    
              /* 将学习到的pdr_index添加进flowtable */
              if (upf_buffer_opaque (b0)->upf.pdr_index != ~0)
                {
                  next0 = PD_NEXT_PROCESS;
                  pkts_counter++;
    
                  flow->pdr_index[is_reverse] =
                      upf_buffer_opaque (b0)->upf.pdr_index;
                  upf_add_flow_id_to_pdr (
                      sess, upf_buffer_opaque (b0)->upf.pdr_index,
                      upf_buffer_opaque (b0)->upf.flow_index, b0, is_reverse);
                }