VPP Agent 调用分析
VPP Agent 可以通过三种方式被调用,分别是 Restful Api,Grpc,KV 数据库 watched;
KV 数据库管理
通过直接操作 KV 数据库进行 VPP Agent 的管理,当用户通过 etcdctl 或者 redis-cli 对 VPP-Agent watch 的前缀资源进行修改时,etcd 或 redis 通过 channel 通知 vpp-agent 进行任务下发和操作。
示例通过 redis-cli 下发分析整个流程
Agent 初始化
/vpp-agent/cmd/vpp-agent/app/vpp_agent.go
定义 redis 数据同步,写入和 watch。定义 orchestrator 和 upf plugin 的 watch 和 Publsher 选择对应数据库。
redisDataSync := kvdbsync.NewPlugin(kvdbsync.UseKV(&redis.DefaultPlugin))
writers := datasync.KVProtoWriters{
redisDataSync
}
watchers := datasync.KVProtoWatchers{
redisDataSync
}
orchestrator.DefaultPlugin.Watcher = watchers
orchestrator.DefaultPlugin.StatusPublisher = writers
upfplugin.DefaultPlugin.KVdbWatcher = watchers
upfplugin.DefaultPlugin.RedisPlugin = redisDataSync
启动后加载所有的 Plugin
UPF Plugin
加载 UPF Plugin 时,创建 GoVppmux 的channel 和 handler
vpp-agent/plugins/vpp/upfplugin/upfplugin.go
if p.vppCh, err = p.GoVppmux.NewAPIChannel(); err != nil {
return errors.Errorf("failed to create GoVPP API channel: %v", err)
}
p.vpeHandler, err = vpevppcalls.NewHandler(p.GoVppmux)
if err != nil {
return errors.Errorf("VPP core handler error: %w", err)
}
注册 KV 的前缀并 watch
vpp-agent/plugins/vpp/upfplugin/upfplugin.go
p.statusDescriptor = descriptor.NewHAStatusDescriptor(p.upfHandler, p.Log)
p.associationDescriptor = descriptor.NewPfcpAssociationDescriptor(p.upfHandler, p.Log)
p.pfdDescriptor = descriptor.NewPfcpPfdManagementDescriptor(p.upfHandler, p.Log)
p.sessionDescriptor = descriptor.NewPfcpSessionDescriptor(p.upfHandler, p.Log)
p.pdrDescriptor = descriptor.NewPfcpPdrDescriptor(p.upfHandler, p.Log)
p.farDescriptor = descriptor.NewPfcpFarDescriptor(p.upfHandler, p.Log)
p.qerDescriptor = descriptor.NewPfcpQerDescriptor(p.upfHandler, p.Log)
p.urrDescriptor = descriptor.NewPfcpUrrDescriptor(p.upfHandler, p.Log)
p.barDescriptor = descriptor.NewPfcpBarDescriptor(p.upfHandler, p.Log)
p.gtpuendpointDescriptor = descriptor.NewGtpuEndpointDescriptor(p.upfHandler, p.Log)
p.pfcpendpointDescriptor = descriptor.NewPfcpEndpointDescriptor(p.upfHandler, p.Log)
p.nwiDescriptor = descriptor.NewNwiDescriptor(p.upfHandler, p.Log)
err = p.Deps.Scheduler.RegisterKVDescriptor(p.*)
p.vppNotificationCh.* = make(chan govppapi.Message, notificationChannelSize)
if err := p.subscribeWatcher(); err != nil {
return err
}
go p.watchVPPNotifications()
其中 p.nwiDescriptor = descriptor.NewNwiDescriptor(p.upfHandler, p.Log) 定义了具体资源(Nwi)的实现方法,其中有对资源的操作。
其中 err = p.Deps.Scheduler.RegisterKVDescriptor(p.*) 是注册匹配前缀的实现方法
WatchVPPNotification
vpp-agent/plugins/vpp/upfplugin/vpp_notification.go
toRedisTxn := &WaitTxn{Txn: p.RedisBroker.NewTxn(), Count: 0}
for {
select {
case e := <-p.*:
p.*(&e)
...
}
commited := toRedisTxn.Commit(false, p.Log)
if commited == true {
toRedisTxn = &WaitTxn{Txn: p.RedisBroker.NewTxn(), Count: 0}
}
}
在启动后会做一次同步
vpp-agent/cmd/vpp-agent/app/vpp_agent.go
func (a *VPPAgent) AfterInit() error {
// manually start resync after all plugins started
resync.DefaultPlugin.DoResync()
}
orchestrator
orchestrator plugin
orchestrator plugin 启动后,去 watchEvents
vpp-agent/plugins/orchestrator/orchestrator.go
func (p *Plugin) AfterInit() (err error) {
// watch datasync events
p.wg.Add(1)
go p.watchEvents()
}
当修改数据库后,通过 channel 告知修改
vpp-agent/plugins/orchestrator/orchestrator.go
func (p *Plugin) watchEvents() {
for {
select {
case e := <-p.changeChan:
p.log.Debugf("=> received CHANGE event (%v changes)", len(e.GetChanges()))
......
p.PushData(ctx, kvPairs)
for _, v := range upfEvents {
go p.handleUPFEvent(v.Prefix, v.Iter, e.GetContext(), &waitsession)
}
}
}
PushData
vpp-agent/plugins/orchestrator/dispatcher.go
seqID, err := txn.Commit(ctx)
由 txn 进行 crud 的匹配执行对应的操作方法,调用
vpp-agents/plugins/kvscheduler/txn_exec.go
func (s *Scheduler) applyValue
分析后,指向 vpp-agent/plugins/vpp/upfplugin/descriptor/nwi.go
func (d *NwiDescriptor) Create(key string, value *model.UpfNwiAddDel) (metadata interface{}, err error) {
d.log.Debugf("NwiDescriptor Create: key %v value %+v\n", key, value)
if GetHAStatus() == HAStatusMaster {
d.log.Debug("HA-STATUS is HAStatusMaster, do nothing")
return nil, nil
}
err = d.vppHandler.VppUpdateNwi(value)
if err != nil {
d.log.Error(err)
}
return nil, nil
}
指向具体的 Handler 方法
err = d.vppHandler.VppUpdateNwi(value)
vppcalls
向 VPP API 发送信息的方法
初始化时,定义了UpfVppHandler /vpp-agent/plugins/vpp/upfplugin/vppcalls/vpp2009/vppcalls_handlers.go
在执行 upf_vppcalls 是通过上面定义的 UpfVppHandler 向 govpp api 发送信息
vpp-agent/plugins/vppcalls/vpp2009/upf_vppcalls.go
func (h *UpfVppHandler) VppDeleteNwi(entry *models.UpfNwiAddDel) error {
var err error
h.log.Debugf("Nwi %+v\n", entry)
request := &upf.UpfNwiAddDel{}
reply := &upf.UpfNwiAddDelReply{}
request.IsAdd = 0
Copy(request, entry)
err = h.callsChannel.SendRequest(request).ReceiveReply(reply)
if err != nil {
return errors.Errorf("VppUpdateNwi error %v", err)
}
return nil
}