基于 VPP-Agent MP2 开发部署

MP2 VPP-Agent 架构

VPP Agent structure

MP2 VPP-Agent 部署

Redis 安装

$ apt-get install redis
$ systemctl enable redis

redis 配置 /etc/redis/redis.conf

bind 127.0.0.1
protected-mode yes
port 6379
tcp-backlog 511
timeout 0
tcp-keepalive 300
daemonize yes
supervised no
pidfile /var/run/redis/redis-server.pid
loglevel notice
logfile /var/log/redis/redis-server.log
databases 16
always-show-logo yes
save 900 1
save 300 10
save 60 10000
stop-writes-on-bgsave-error yes
rdbcompression yes
rdbchecksum yes
dbfilename dump.rdb
dir /var/lib/redis
slave-serve-stale-data yes
slave-read-only yes
repl-diskless-sync no
repl-diskless-sync-delay 5
repl-disable-tcp-nodelay no
slave-priority 100
lazyfree-lazy-eviction no
lazyfree-lazy-expire no
lazyfree-lazy-server-del no
slave-lazy-flush no
appendonly no
appendfilename "appendonly.aof"
appendfsync everysec
no-appendfsync-on-rewrite no
auto-aof-rewrite-percentage 100
auto-aof-rewrite-min-size 64mb
aof-load-truncated yes
aof-use-rdb-preamble no
lua-time-limit 5000
slowlog-log-slower-than 10000
slowlog-max-len 128
latency-monitor-threshold 0
notify-keyspace-events "AKE"
hash-max-ziplist-entries 512
hash-max-ziplist-value 64
list-max-ziplist-size -2
list-compress-depth 0
set-max-intset-entries 512
zset-max-ziplist-entries 128
zset-max-ziplist-value 64
hll-sparse-max-bytes 3000
activerehashing yes
client-output-buffer-limit normal 0 0 0
client-output-buffer-limit slave 256mb 64mb 60
client-output-buffer-limit pubsub 32mb 8mb 60
hz 10
aof-rewrite-incremental-fsync yes

启动 redis

$ systemctl start redis

VPP-Agent 安装

裸机部署

可以直接拷贝二进制文件;描述一下编译部分,前提机器已经有 GO 环境。

$ git clone -b vpp-agent-mp2 git@gitlab.sh.99cloud.net:5GS/vpp-agent.git
$ cd vpp-agent/cmd/vpp-agent
$ go build -mod=vendor

生成二进制文件 vpp-agent

VPP-Agent 配置目录 /etc/vpp-agent/

http.conf,由于 MP2 要求 HTTPS,所以设置密钥。

endpoint: 127.0.0.1:9191
server-cert-file: /root/https-server.crt
server-key-file: /root/https-server.key

vpp-upfplugin.conf

publishers:
  - redis

govpp.conf

binapi-socket-path: /run/vpp/api.sock

启动 vpp-agent

$ ./cmd/vpp-agent/vpp-agent -config-dir=/etc/vpp-agent -http-config=/etc/vpp-agent/http.conf

容器部署

待补充完善

具体在仓库中添加 Dockerfile,和配置信息。

Dockerfile

FROM ubuntu:18.04

RUN apt-get update && apt-get install -y --no-install-recommends \
        # general tools
        inetutils-traceroute \
        iproute2 \
        iputils-ping \
        # vpp requirements
        ca-certificates \
        libapr1 \
        libc6 \
        libmbedcrypto1 \
        libmbedtls10 \
        libmbedx509-0 \
        libnuma1 \
        openssl \
    && rm -rf /var/lib/apt/lists/*

COPY etc/*.conf /etc/vpp-agent/

COPY https-server.* /root/

COPY cmd/vpp-agent /root/

ENV INITIAL_LOGLVL=DEBUG

CMD ./vpp-agent -config-dir=/etc/vpp-agent -http-config=/etc/vpp-agent/http.conf

将配置 放在 etc/目录下

拉取代码并 go build 如裸机部署部分。

// build 生成 image
$ docker build -f Dockerfile       --tag vpp-agent:v1.0 .

启动容器

$ docker run -itd -p 9191:9191 --name vpp-agent -v /run/vpp:/run/vpp --net=host vpp-agent:v1.0

MP2 VPP-Agent 开发

代码生成部分

分为三个部分

  • 北向 Restful API 的 Proto,pfcp.pb.go 通过 pfcp.proto 生成

    $ make generate-proto
    

    该命令会更新所有 vpp-agent 的 *.pb.go 代码

  • descriptor 中的 adapter,通过 descriptor 生成,示例生成 MP2TrafficRule 部分。

    $ descriptor-adapter --descriptor-name MP2TrafficRule --value-type *vpp_upf.MP2TrafficRule --import go.ligato.io/vpp-agent/v3/proto/ligato/vpp/upf --output-dir descriptor
    
  • 南向接口的 UPF API 的结构体,编译安装 UPF 后会生成对应的 upf.api.json,通过 upf.api.json 生成。

    $ binapi-generator --no-version-info --output-dir=plugins/vpp/binapi/vpp2009/ --input-file=/usr/share/vpp/api/plugins/upf.api.json
    

Redis 操作

UPF Plugin 实例化了 Redis,并规定前缀为 /vnf-agent/vpp1/

upfplugin.go Init()

p.RedisBroker = p.RedisPlugin.KvPlugin.NewBroker(p.RedisPlugin.ServiceLabel.GetAgentPrefix())

如果其他组件需要操作 Redis,则需要将 plugin 的 RedisBroker 传入。例如后续的 descriptor

p.mp2TrafficRuleDescriptor = descriptor.NewMp2TrafficRuleDescriptor(p.upfHandler, p.Log, p.RedisBroker)

redisBroker 有以下操作方法

GetValue(key string, reqObj proto.Message) (found bool, revision int64, err error)
// ListValues returns an iterator that enables to traverse all items stored
// under the provided <key>.
ListValues(key string) (ProtoKeyValIterator, error)
// ListKeys returns an iterator that allows to traverse all keys from data
// store that share the given <prefix>.
ListKeys(prefix string) (ProtoKeyIterator, error)
// Delete removes data stored under the <key>.
Delete(key string, opts ...datasync.DelOption) (existed bool, err error)

redisBroker 中关于 UUID 的设计

由于 MP2 中 UUID 由 32 位字符串组成,但 UPF 不希望维护 32 位 UUID 的表。所以 UUID 和传入 UPF 的索引之间的关系由 VPP-Agent 来维护。

Redis 中前缀为 /vnf-agent/vpp1/uuid_index/available/ 和 /vnf-agent/vpp1/uuid_index/inuse/ 则为这种逻辑服务,VPP-Agent 启动时,查看 inuse 的数量,确认下个 Index 的值。当 inuse 中删除,会将 Index 存入 available 中,可以其他资源使用。当新建资源时,优先使用 available 中的 Index,没有则使用下一个 Index。

这里以创建一个 trafficRule 举例,VPP-Agent 创建一个 TrafficRule 资源时,首先从 /vnf-agent/vpp1/uuid_index/available/ 中读取,查看是否有可用的 Index,没有的情况下,会创建两条 redis 数据库;第一条为 trafficRule 数据,/vnf-agent/vpp1/config/vpp/upf/v2/mp2/trafficrule/094e2e22-7dcf-451d-8c0f-d725f7cd9c00,第二条为 UUID 094e2e22-7dcf-451d-8c0f-d725f7cd9c00 对应的 Index,/vnf-agent/vpp1/uuid_index/inuse/094e2e22-7dcf-451d-8c0f-d725f7cd9c00;删除时,删除第一条 trafficRule 数据,将 inuse 中的 Index 放到 available 里,/vnf-agent/vpp1/uuid_index/available/094e2e22-7dcf-451d-8c0f-d725f7cd9c00。

代码实现如下

UPF Plugin 初始化时通过 getUUIDNum 来获取当前的所有 Index,如果需要分配新的 Index 时,则从该值后分配。

func (p *UpfPlugin) getUUIDNum() (uint32, error) {
	var UUIDInUseNum, UUIDAvailbleNum uint32
	UUIDInUseNum, UUIDAvailbleNum = 0, 0
	iter, err := p.RedisBroker.ListValues(uuidIndexForUpfInUse)
	for {
		_, stop := iter.GetNext()
		if stop {
			break
		}
		UUIDInUseNum++
	}
	iter, err = p.RedisBroker.ListValues(uuidIndexForUpfAvailable)
	for {
		_, stop := iter.GetNext()
		if stop {
			break
		}
		UUIDAvailbleNum++
	}
	return UUIDInUseNum + UUIDAvailbleNum, nil
}

当创建资源时

rest_api.go 资源创建函数中

_, err = p.createUUIDIndex(mp2TrafficRuleReqInfo.TrafficRuleId)
func (p *UpfPlugin) createUUIDIndex(uuid string) (uint32, error) {
	// 查看可用值里是否有 Index
	iter, err := p.RedisBroker.ListValues(UUIDIndexForUpfAvailable)

	UUIDIndex := &models.UUIDIndex{}
	kv, stop := iter.GetNext()
	if stop {
		// 如果可用值里没有则从初始化时获取到的 Index 后一个值开始分配
		p.publishLock.Lock()
		UUIDSum++
		UUIDIndex.Index = UUIDSum
		// 使用新值
		err = p.RedisBroker.Put(UUIDIndexForUpfInUse + uuid, UUIDIndex)
		p.publishLock.Unlock()
		return UUIDIndex.Index, nil
	} else {
		// 如果可用值里有, 则将该 Index 删除,放到 inuse 中
		p.publishLock.Lock()
		_, err := p.RedisBroker.Delete(kv.GetKey())
		if err != nil {
			return MaxUUIDSum, err
		}
		err = kv.GetValue(UUIDIndex)
		if err != nil {
			return MaxUUIDSum, err
		}
		err = p.RedisBroker.Put(UUIDIndexForUpfInUse + uuid, UUIDIndex)
		if err != nil {
			return MaxUUIDSum, err
		}
		p.publishLock.Unlock()
		return UUIDIndex.Index, nil
	}
}

当资源删除时

descriptor 中的 delete 触发

utils.go

// redis move UUID_index from inuse to available
func deleteUUIDIndex(RedisBroker keyval.ProtoBroker, uuid string) error {
	UUIDIndex := &models.UUIDIndex{}
	_, _, err := RedisBroker.GetValue(UUIDIndexForUpfInUse + uuid, UUIDIndex)
	if err != nil {
		return err
	}

// 从 InUse 中删除,放到 Available 内。
	_, err = RedisBroker.Delete(UUIDIndexForUpfInUse + uuid)
	if err != nil {
		return  err
	}
	err = RedisBroker.Put(UUIDIndexForUpfAvailable + uuid, UUIDIndex)
	if err != nil {
		return  err
	}

	return nil
}

Restful API 开发

VPP-Agent 已经使用 gorilla 作为 web 工具,我们直接使用它

在 upfplugin 中增加 rest_api.go,提供方法的注册和处理函数

func (p *UpfPlugin) registerHandlers(http rest.HTTPHandlers) {
	http.RegisterHTTPHandler(*path, *func, *action)
}

func (p *UpfPlugin) *func(formatter *render.Render) http.HandlerFunc {
	return func(w http.ResponseWriter, req *http.Request) {
		...
	}
}

在 upfplugin 启动后调用上面的注册

func (p *UpfPlugin) AfterInit() error {
	p.registerHandlers(p.HTTPHandlers)
}

在上面处理函数中,实现对资源数据的操作

func (p *UpfPlugin) mp2PostTrafficRuleHandler(formatter *render.Render) http.HandlerFunc {
	return func(w http.ResponseWriter, req *http.Request) {
		body, err := ioutil.ReadAll(req.Body)

		vars := mux.Vars(req)

		var mp2TrafficRuleReqInfo models.MP2TrafficRule
		if err = json.Unmarshal(body, &mp2TrafficRuleReqInfo); err != nil {
		}

// check body
		err = checkPathAvailable(vars, mp2TrafficRuleReqInfo)

		var found bool
		MP2TrafficRuleInfo := &models.MP2TrafficRule{}
		found, _, err = p.RedisBroker.GetValue(trafficRuleKvPrefix + mp2TrafficRuleReqInfo.TrafficRuleId,
			MP2TrafficRuleInfo)

		if "POST" == req.Method {
			if found {
				return
			}
			_, err = p.createUUIDIndex(mp2TrafficRuleReqInfo.TrafficRuleId)
		} else {}

// write into redis
		err = p.RedisBroker.Put(trafficRuleKvPrefix + mp2TrafficRuleReqInfo.TrafficRuleId, &mp2TrafficRuleReqInfo)

		return
	}
}

至此,VPP-Agent 北向内容基本完成。

南向触发和操作

加载 UPF Plugin 时,创建 GoVppmux 的channel 和 handler

vpp-agent/plugins/vpp/upfplugin/upfplugin.go

	if p.vppCh, err = p.GoVppmux.NewAPIChannel(); err != nil {
		return errors.Errorf("failed to create GoVPP API channel: %v", err)
	}

	p.vpeHandler, err = vpevppcalls.NewHandler(p.GoVppmux)
	if err != nil {
		return errors.Errorf("VPP core handler error: %w", err)
	}

注册 KV 的前缀并 watch

vpp-agent/plugins/vpp/upfplugin/upfplugin.go

	p.mp2TrafficRuleDescriptor = descriptor.NewMp2TrafficRuleDescriptor(p.upfHandler, p.Log, p.RedisBroker)

	err = p.Deps.Scheduler.RegisterKVDescriptor(p.*)

	p.vppNotificationCh.* = make(chan govppapi.Message, notificationChannelSize)

	if err := p.subscribeWatcher(); err != nil {
		return err
	}

	go p.watchVPPNotifications()

其中 p.mp2TrafficRuleDescriptor= descriptor.NewMp2TrafficRuleDescriptor(p.upfHandler, p.Log) 定义了具体资源(Nwi)的实现方法,其中有对资源的操作。

其中 err = p.Deps.Scheduler.RegisterKVDescriptor(p.*) 是注册匹配前缀的实现方法

redis 数据 key 的实现

descriptor 中

typedDescr := &adapter.MP2TrafficRuleDescriptor{
		Name:                 MP2TrafficRuleDescriptorName,
		NBKeyPrefix:          model.ModelMP2TrafficRule.KeyPrefix(),
		ValueTypeName:        model.ModelMP2TrafficRule.ProtoName(),
		KeySelector:          model.ModelMP2TrafficRule.IsKeyValid,
		KeyLabel:             model.ModelMP2TrafficRule.StripKeyPrefix,
		}

model 定义

ModelMP2TrafficRule = models.Register(&MP2TrafficRule{}, models.Spec{
   Module:  ModuleName,
   Version: "v2",
   Type:    "mp2.trafficrule",
}, models.WithNameTemplate(`{{.TrafficRuleId}}`))

vpp/upf/v2/mp2/trafficrule/{TrafficRuleId},当修改 redis 后,MP2TrafficRuleDescriptor 收到操作指令。

Descriptor 的操作

在 Descriptor 中实现了 通过 vppHandler 进行 CRUD 的操作

// Create creates new value.
func (d *MP2TrafficRuleDescriptor) Create(key string, value *model.MP2TrafficRule) (metadata interface{}, err error) {
	if GetHAStatus() == HAStatusMaster {
		d.log.Info("HA-STATUS is HAStatusMaster, do nothing")
		return nil, nil
	}
	//  upf_vppcalls 实现的 govppapi.Channel  UPF API 接口
	err = d.vppHandler.VppUpdateMP2TrafficRule(value, 0)
	if err != nil {
		d.log.Error(err)
	}
	return nil, nil
}

upf_vppcalls.go

func (h *UpfVppHandler) VppUpdateMP2TrafficRule(entry *models.MP2TrafficRule, mp2Id uint32) error {
   var err error

   request := &upf.UpfMp2IPTrafficRule{
      IsAdd: 1,
      UpfTrafficruleID: mp2Id,
      AppID: []byte(entry.AppName),
      NPfdContents: uint32(len(entry.TrafficFilter)),
   }
   reply := &upf.UpfMp2IPTrafficRuleReply{}

   var upfActionString string
   upfActionString, err = parseActionForUPF(entry.Action)
   if err != nil {return nil}
   for i := 0; i < len(entry.TrafficFilter); i++ {
      var upfProto = "ip"
      if entry.TrafficFilter[0].Protocol != nil {
         upfProto, err = parseProtocol(entry.TrafficFilter[0].Protocol[0])
         if err != nil {return err}
      }

      var srcAddress, dstAddress, srcPort, dstPort = "", "", "", ""
      if entry.TrafficFilter[0].SrcAddress != nil {srcAddress = entry.TrafficFilter[0].SrcAddress[0]}
      if entry.TrafficFilter[0].DstAddress != nil {dstAddress = entry.TrafficFilter[0].DstAddress[0]}
      if entry.TrafficFilter[0].SrcPort != nil {srcPort = entry.TrafficFilter[0].SrcPort[0]}
      if entry.TrafficFilter[0].DstPort != nil {dstPort = entry.TrafficFilter[0].DstPort[0]}

      filterRuleStr := fmt.Sprintf("%s in %s from %s %s to %s %s",
         upfActionString, upfProto, srcAddress, srcPort, dstAddress, dstPort)

      request.Mp2TrafficRuleContent[i].Flags = 1
      request.Mp2TrafficRuleContent[i].FlowDescription = []byte(filterRuleStr)
   }

   h.log.Debugf("send request to upf %+v\n", request)
   err = h.callsChannel.SendRequest(request).ReceiveReply(reply)
   if err != nil {
      return errors.Errorf("VppUpdateMP2TrafficRule error %v", err)
   }

   return nil
}

其中 UPF API 接口定义结构体如下

upf.ba.go

type UpfMp2IPTrafficRule struct {
	IsAdd                 uint32        `binapi:"u32,name=is_add" json:"is_add,omitempty"`
	DelAll                uint32        `binapi:"u32,name=del_all" json:"del_all,omitempty"`
	UpfTrafficruleID      uint32        `binapi:"u32,name=upf_trafficrule_id" json:"upf_trafficrule_id,omitempty"`
	AppID                 []byte        `binapi:"u8[32],name=app_id" json:"app_id,omitempty"`
	NPfdContents          uint32        `binapi:"u32,name=n_pfd_contents" json:"n_pfd_contents,omitempty"`
	Mp2TrafficRuleContent [8]PfdContent `binapi:"pfd_content[8],name=mp2_traffic_rule_content" json:"mp2_traffic_rule_content,omitempty"`
}

type PfdContent struct {
	Flags           uint32 `binapi:"u32,name=flags" json:"flags,omitempty"`
	FlowDescription []byte `binapi:"u8[128],name=flow_description" json:"flow_description,omitempty"`
	URL             []byte `binapi:"u8[32],name=url" json:"url,omitempty"`
	Domain          []byte `binapi:"u8[16],name=domain" json:"domain,omitempty"`
	Custom          []byte `binapi:"u8[8],name=custom" json:"custom,omitempty"`
}

使用 upf_vppcalls 时的结构体如下

type MP2TrafficRule struct {
   state         protoimpl.MessageState
   sizeCache     protoimpl.SizeCache
   unknownFields protoimpl.UnknownFields

   MepId         string              `protobuf:"bytes,1,opt,name=mepId,proto3" json:"mepId,omitempty"`
   AppName       string              `protobuf:"bytes,2,opt,name=appName,proto3" json:"appName,omitempty"`
   TrafficRuleId string              `protobuf:"bytes,3,opt,name=trafficRuleId,proto3" json:"trafficRuleId,omitempty"`
   Priority      uint32              `protobuf:"varint,4,opt,name=priority,proto3" json:"priority,omitempty"`
   Action        string              `protobuf:"bytes,5,opt,name=action,proto3" json:"action,omitempty"`
   TrafficFilter []*MP2TrafficFilter `protobuf:"bytes,6,rep,name=trafficFilter,proto3" json:"trafficFilter,omitempty"`
}

type MP2TrafficFilter struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

	IpAddressType string   `protobuf:"bytes,1,opt,name=ipAddressType,proto3" json:"ipAddressType,omitempty"`
	SrcAddress    []string `protobuf:"bytes,2,rep,name=srcAddress,proto3" json:"srcAddress,omitempty"`
	DstAddress    []string `protobuf:"bytes,3,rep,name=dstAddress,proto3" json:"dstAddress,omitempty"`
	SrcPort       []string `protobuf:"bytes,4,rep,name=srcPort,proto3" json:"srcPort,omitempty"`
	DstPort       []string `protobuf:"bytes,5,rep,name=dstPort,proto3" json:"dstPort,omitempty"`
	Protocol      []string `protobuf:"bytes,6,rep,name=protocol,proto3" json:"protocol,omitempty"`
}

由于结构体不同,而且 TrafficRule 为五元组,而 UPF 需要 fd,则还需要进行翻译工作,即 upf_vppcalls.go 给 request 赋值前进行的解析设置操作。

request 通过 h.callsChannel.SendRequest(request).ReceiveReply(reply) 发送和收取 reply 进行 UPF 操作和结果查询。后续丰富根据 reply 报错分析操作的失败原因等。

以上完成逻辑闭环。