VPP Agent 调用分析

VPP Agent 可以通过三种方式被调用,分别是 Restful Api,Grpc,KV 数据库 watched;

call-analysis

KV 数据库管理

通过直接操作 KV 数据库进行 VPP Agent 的管理,当用户通过 etcdctl 或者 redis-cli 对 VPP-Agent watch 的前缀资源进行修改时,etcd 或 redis 通过 channel 通知 vpp-agent 进行任务下发和操作。

示例通过 redis-cli 下发分析整个流程

Agent 初始化

/vpp-agent/cmd/vpp-agent/app/vpp_agent.go

定义 redis 数据同步,写入和 watch。定义 orchestrator 和 upf plugin 的 watch 和 Publsher 选择对应数据库。

    redisDataSync := kvdbsync.NewPlugin(kvdbsync.UseKV(&redis.DefaultPlugin))

    writers := datasync.KVProtoWriters{
    	redisDataSync
    }
    
    watchers := datasync.KVProtoWatchers{
    	redisDataSync
    }
    
    orchestrator.DefaultPlugin.Watcher = watchers
	orchestrator.DefaultPlugin.StatusPublisher = writers
	
	upfplugin.DefaultPlugin.KVdbWatcher = watchers
	upfplugin.DefaultPlugin.RedisPlugin = redisDataSync

启动后加载所有的 Plugin

UPF Plugin

加载 UPF Plugin 时,创建 GoVppmux 的channel 和 handler

vpp-agent/plugins/vpp/upfplugin/upfplugin.go

	if p.vppCh, err = p.GoVppmux.NewAPIChannel(); err != nil {
		return errors.Errorf("failed to create GoVPP API channel: %v", err)
	}
	
	p.vpeHandler, err = vpevppcalls.NewHandler(p.GoVppmux)
	if err != nil {
		return errors.Errorf("VPP core handler error: %w", err)
	}

注册 KV 的前缀并 watch

vpp-agent/plugins/vpp/upfplugin/upfplugin.go

	p.statusDescriptor = descriptor.NewHAStatusDescriptor(p.upfHandler, p.Log)
	p.associationDescriptor = descriptor.NewPfcpAssociationDescriptor(p.upfHandler, p.Log)
	p.pfdDescriptor = descriptor.NewPfcpPfdManagementDescriptor(p.upfHandler, p.Log)
	p.sessionDescriptor = descriptor.NewPfcpSessionDescriptor(p.upfHandler, p.Log)
	p.pdrDescriptor = descriptor.NewPfcpPdrDescriptor(p.upfHandler, p.Log)
	p.farDescriptor = descriptor.NewPfcpFarDescriptor(p.upfHandler, p.Log)
	p.qerDescriptor = descriptor.NewPfcpQerDescriptor(p.upfHandler, p.Log)
	p.urrDescriptor = descriptor.NewPfcpUrrDescriptor(p.upfHandler, p.Log)
	p.barDescriptor = descriptor.NewPfcpBarDescriptor(p.upfHandler, p.Log)
	p.gtpuendpointDescriptor = descriptor.NewGtpuEndpointDescriptor(p.upfHandler, p.Log)
	p.pfcpendpointDescriptor = descriptor.NewPfcpEndpointDescriptor(p.upfHandler, p.Log)
	p.nwiDescriptor = descriptor.NewNwiDescriptor(p.upfHandler, p.Log)
	
	err = p.Deps.Scheduler.RegisterKVDescriptor(p.*)
	
	p.vppNotificationCh.* = make(chan govppapi.Message, notificationChannelSize)
	
	if err := p.subscribeWatcher(); err != nil {
		return err
	}
	
	go p.watchVPPNotifications()

其中 p.nwiDescriptor = descriptor.NewNwiDescriptor(p.upfHandler, p.Log) 定义了具体资源(Nwi)的实现方法,其中有对资源的操作。

其中 err = p.Deps.Scheduler.RegisterKVDescriptor(p.*) 是注册匹配前缀的实现方法

WatchVPPNotification

vpp-agent/plugins/vpp/upfplugin/vpp_notification.go

	toRedisTxn := &WaitTxn{Txn: p.RedisBroker.NewTxn(), Count: 0}

	for {
		select {
		case e := <-p.*:
			p.*(&e)
		...
		}
	
		commited := toRedisTxn.Commit(false, p.Log)
	
		if commited == true {
			toRedisTxn = &WaitTxn{Txn: p.RedisBroker.NewTxn(), Count: 0}
		}
	}

在启动后会做一次同步

vpp-agent/cmd/vpp-agent/app/vpp_agent.go

func (a *VPPAgent) AfterInit() error {
	// manually start resync after all plugins started
	resync.DefaultPlugin.DoResync()
}

orchestrator

orchestrator plugin

orchestrator plugin 启动后,去 watchEvents

vpp-agent/plugins/orchestrator/orchestrator.go

func (p *Plugin) AfterInit() (err error) {
	// watch datasync events
	p.wg.Add(1)
	go p.watchEvents()
}

当修改数据库后,通过 channel 告知修改

vpp-agent/plugins/orchestrator/orchestrator.go

func (p *Plugin) watchEvents() {
	for {
		select {
		case e := <-p.changeChan:
			p.log.Debugf("=> received CHANGE event (%v changes)", len(e.GetChanges()))
			
		......
			p.PushData(ctx, kvPairs)

			for _, v := range upfEvents {
				go p.handleUPFEvent(v.Prefix, v.Iter, e.GetContext(), &waitsession)
			}
	}
}

PushData

vpp-agent/plugins/orchestrator/dispatcher.go

seqID, err := txn.Commit(ctx)

由 txn 进行 crud 的匹配执行对应的操作方法,调用

vpp-agents/plugins/kvscheduler/txn_exec.go

func (s *Scheduler) applyValue

分析后,指向 vpp-agent/plugins/vpp/upfplugin/descriptor/nwi.go

func (d *NwiDescriptor) Create(key string, value *model.UpfNwiAddDel) (metadata interface{}, err error) {
	d.log.Debugf("NwiDescriptor Create: key %v value %+v\n", key, value)
	if GetHAStatus() == HAStatusMaster {
		d.log.Debug("HA-STATUS is HAStatusMaster, do nothing")
		return nil, nil
	}
	err = d.vppHandler.VppUpdateNwi(value)
	if err != nil {
		d.log.Error(err)
	}
	return nil, nil
}

指向具体的 Handler 方法

err = d.vppHandler.VppUpdateNwi(value)

vppcalls

向 VPP API 发送信息的方法

初始化时,定义了UpfVppHandler /vpp-agent/plugins/vpp/upfplugin/vppcalls/vpp2009/vppcalls_handlers.go

在执行 upf_vppcalls 是通过上面定义的 UpfVppHandler 向 govpp api 发送信息

vpp-agent/plugins/vppcalls/vpp2009/upf_vppcalls.go

func (h *UpfVppHandler) VppDeleteNwi(entry *models.UpfNwiAddDel) error {
	var err error

	h.log.Debugf("Nwi %+v\n", entry)

	request := &upf.UpfNwiAddDel{}
	reply := &upf.UpfNwiAddDelReply{}

	request.IsAdd = 0
	Copy(request, entry)

	err = h.callsChannel.SendRequest(request).ReceiveReply(reply)
	if err != nil {
		return errors.Errorf("VppUpdateNwi error %v", err)
	}
	return nil
}