# VPP Agent 调用分析 VPP Agent 可以通过三种方式被调用,分别是 Restful Api,Grpc,KV 数据库 watched; ![call-analysis](../../../../_static/system_architecture.png) ## KV 数据库管理 通过直接操作 KV 数据库进行 VPP Agent 的管理,当用户通过 etcdctl 或者 redis-cli 对 VPP-Agent watch 的前缀资源进行修改时,etcd 或 redis 通过 channel 通知 vpp-agent 进行任务下发和操作。 示例通过 redis-cli 下发分析整个流程 ### Agent 初始化 /vpp-agent/cmd/vpp-agent/app/vpp_agent.go 定义 redis 数据同步,写入和 watch。定义 orchestrator 和 upf plugin 的 watch 和 Publsher 选择对应数据库。 ``` redisDataSync := kvdbsync.NewPlugin(kvdbsync.UseKV(&redis.DefaultPlugin)) writers := datasync.KVProtoWriters{ redisDataSync } watchers := datasync.KVProtoWatchers{ redisDataSync } orchestrator.DefaultPlugin.Watcher = watchers orchestrator.DefaultPlugin.StatusPublisher = writers upfplugin.DefaultPlugin.KVdbWatcher = watchers upfplugin.DefaultPlugin.RedisPlugin = redisDataSync ``` 启动后加载所有的 Plugin ### UPF Plugin 加载 UPF Plugin 时,创建 GoVppmux 的channel 和 handler vpp-agent/plugins/vpp/upfplugin/upfplugin.go ``` if p.vppCh, err = p.GoVppmux.NewAPIChannel(); err != nil { return errors.Errorf("failed to create GoVPP API channel: %v", err) } p.vpeHandler, err = vpevppcalls.NewHandler(p.GoVppmux) if err != nil { return errors.Errorf("VPP core handler error: %w", err) } ``` 注册 KV 的前缀并 watch vpp-agent/plugins/vpp/upfplugin/upfplugin.go ``` p.statusDescriptor = descriptor.NewHAStatusDescriptor(p.upfHandler, p.Log) p.associationDescriptor = descriptor.NewPfcpAssociationDescriptor(p.upfHandler, p.Log) p.pfdDescriptor = descriptor.NewPfcpPfdManagementDescriptor(p.upfHandler, p.Log) p.sessionDescriptor = descriptor.NewPfcpSessionDescriptor(p.upfHandler, p.Log) p.pdrDescriptor = descriptor.NewPfcpPdrDescriptor(p.upfHandler, p.Log) p.farDescriptor = descriptor.NewPfcpFarDescriptor(p.upfHandler, p.Log) p.qerDescriptor = descriptor.NewPfcpQerDescriptor(p.upfHandler, p.Log) p.urrDescriptor = descriptor.NewPfcpUrrDescriptor(p.upfHandler, p.Log) p.barDescriptor = descriptor.NewPfcpBarDescriptor(p.upfHandler, p.Log) p.gtpuendpointDescriptor = descriptor.NewGtpuEndpointDescriptor(p.upfHandler, p.Log) p.pfcpendpointDescriptor = descriptor.NewPfcpEndpointDescriptor(p.upfHandler, p.Log) p.nwiDescriptor = descriptor.NewNwiDescriptor(p.upfHandler, p.Log) err = p.Deps.Scheduler.RegisterKVDescriptor(p.*) p.vppNotificationCh.* = make(chan govppapi.Message, notificationChannelSize) if err := p.subscribeWatcher(); err != nil { return err } go p.watchVPPNotifications() ``` 其中 p.nwiDescriptor = descriptor.NewNwiDescriptor(p.upfHandler, p.Log) 定义了具体资源(Nwi)的实现方法,其中有对资源的操作。 其中 err = p.Deps.Scheduler.RegisterKVDescriptor(p.*) 是注册匹配前缀的实现方法 WatchVPPNotification vpp-agent/plugins/vpp/upfplugin/vpp_notification.go ``` toRedisTxn := &WaitTxn{Txn: p.RedisBroker.NewTxn(), Count: 0} for { select { case e := <-p.*: p.*(&e) ... } commited := toRedisTxn.Commit(false, p.Log) if commited == true { toRedisTxn = &WaitTxn{Txn: p.RedisBroker.NewTxn(), Count: 0} } } ``` 在启动后会做一次同步 vpp-agent/cmd/vpp-agent/app/vpp_agent.go ``` func (a *VPPAgent) AfterInit() error { // manually start resync after all plugins started resync.DefaultPlugin.DoResync() } ``` ### orchestrator #### orchestrator plugin orchestrator plugin 启动后,去 watchEvents vpp-agent/plugins/orchestrator/orchestrator.go ``` func (p *Plugin) AfterInit() (err error) { // watch datasync events p.wg.Add(1) go p.watchEvents() } ``` 当修改数据库后,通过 channel 告知修改 vpp-agent/plugins/orchestrator/orchestrator.go ``` func (p *Plugin) watchEvents() { for { select { case e := <-p.changeChan: p.log.Debugf("=> received CHANGE event (%v changes)", len(e.GetChanges())) ...... p.PushData(ctx, kvPairs) for _, v := range upfEvents { go p.handleUPFEvent(v.Prefix, v.Iter, e.GetContext(), &waitsession) } } } ``` #### PushData vpp-agent/plugins/orchestrator/dispatcher.go ``` seqID, err := txn.Commit(ctx) ``` 由 txn 进行 crud 的匹配执行对应的操作方法,调用 vpp-agents/plugins/kvscheduler/txn_exec.go ``` func (s *Scheduler) applyValue ``` 分析后,指向 vpp-agent/plugins/vpp/upfplugin/descriptor/nwi.go ``` func (d *NwiDescriptor) Create(key string, value *model.UpfNwiAddDel) (metadata interface{}, err error) { d.log.Debugf("NwiDescriptor Create: key %v value %+v\n", key, value) if GetHAStatus() == HAStatusMaster { d.log.Debug("HA-STATUS is HAStatusMaster, do nothing") return nil, nil } err = d.vppHandler.VppUpdateNwi(value) if err != nil { d.log.Error(err) } return nil, nil } ``` 指向具体的 Handler 方法 ``` err = d.vppHandler.VppUpdateNwi(value) ``` ### vppcalls 向 VPP API 发送信息的方法 初始化时,定义了UpfVppHandler */vpp-agent/plugins/vpp/upfplugin/vppcalls/vpp2009/vppcalls_handlers.go* 在执行 upf_vppcalls 是通过上面定义的 UpfVppHandler 向 govpp api 发送信息 vpp-agent/plugins/vppcalls/vpp2009/upf_vppcalls.go ``` func (h *UpfVppHandler) VppDeleteNwi(entry *models.UpfNwiAddDel) error { var err error h.log.Debugf("Nwi %+v\n", entry) request := &upf.UpfNwiAddDel{} reply := &upf.UpfNwiAddDelReply{} request.IsAdd = 0 Copy(request, entry) err = h.callsChannel.SendRequest(request).ReceiveReply(reply) if err != nil { return errors.Errorf("VppUpdateNwi error %v", err) } return nil } ```